# -*- coding: utf-8 -*-
import re
import sys
import os
import ast
import json
# Добавляем директорию src в путь для импорта
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
# ============================================================================
# КОНСТАНТЫ И СЛОВАРИ ДЛЯ РАСПОЗНАВАНИЯ
# ============================================================================
# Универсальный паттерн для идентификаторов (кириллица + латиница + цифры + _)
ID_PATTERN = r'[а-яА-ЯёЁa-zA-Z_][а-яА-ЯёЁa-zA-Z0-9_]*'
ID_PATTERN_UNICODE = r'[\w\u0400-\u04FF]+'
# Словари для синонимов операций
SELECT_KEYWORDS = [
'покажи', 'выведи', 'выбери', 'найди', 'получи', 'достань', 'дай',
'отобрази', 'выведи', 'верни', 'извлеки', 'прочитай', 'читай',
'show', 'get', 'find', 'select', 'fetch', 'retrieve', 'display', 'list'
]
INSERT_KEYWORDS = [
'добавь', 'вставь', 'создай', 'запиши', 'внеси', 'сохрани',
'add', 'insert', 'create', 'save', 'store', 'put', 'new'
]
UPDATE_KEYWORDS = [
'обнови', 'измени', 'поменяй', 'установи', 'замени', 'правь', 'отредактируй',
'update', 'modify', 'change', 'set', 'edit', 'alter', 'replace'
]
DELETE_KEYWORDS = [
'удали', 'убери', 'сотри', 'очисти', 'уничтожь', 'вычеркни', 'убрать',
'delete', 'remove', 'erase', 'clear', 'destroy', 'drop', 'wipe'
]
# Словарь для преобразования русских слов в названия таблиц
TABLE_SYNONYMS = {
# Пользователи
'пользователь': 'users', 'пользователя': 'users', 'пользователей': 'users',
'пользователи': 'users', 'юзер': 'users', 'юзера': 'users', 'юзеров': 'users',
'пользователю': 'users', 'пользователям': 'users',
'user': 'users', 'users': 'users',
# Сотрудники
'сотрудник': 'employees', 'сотрудника': 'employees', 'сотрудников': 'employees',
'сотрудники': 'employees', 'сотрудникам': 'employees', 'сотрудниками': 'employees',
'работник': 'employees', 'работника': 'employees', 'работников': 'employees',
'работники': 'employees', 'employee': 'employees', 'employees': 'employees',
# Клиенты
'клиент': 'clients', 'клиента': 'clients', 'клиентов': 'clients',
'клиенты': 'clients', 'клиентам': 'clients', 'клиентами': 'clients',
'customer': 'clients', 'customers': 'clients',
# Заказы
'заказ': 'orders', 'заказа': 'orders', 'заказов': 'orders', 'заказы': 'orders',
'заказам': 'orders', 'заказами': 'orders',
'order': 'orders', 'orders': 'orders',
# Товары/Продукты
'товар': 'products', 'товара': 'products', 'товаров': 'products', 'товары': 'products',
'товарам': 'products', 'товарами': 'products',
'продукт': 'products', 'продукта': 'products', 'продуктов': 'products', 'продукты': 'products',
'product': 'products', 'products': 'products',
# Записи
'запись': 'records', 'записи': 'records', 'записей': 'records',
'записям': 'records', 'записями': 'records',
'record': 'records', 'records': 'records',
# Документы
'документ': 'documents', 'документа': 'documents', 'документов': 'documents',
'документы': 'documents', 'документам': 'documents', 'документами': 'documents',
'document': 'documents', 'documents': 'documents',
# Посты/Статьи
'пост': 'posts', 'поста': 'posts', 'постов': 'posts', 'посты': 'posts',
'статья': 'articles', 'статьи': 'articles', 'статей': 'articles',
'post': 'posts', 'posts': 'posts', 'article': 'articles', 'articles': 'articles',
# Комментарии
'комментарий': 'comments', 'комментария': 'comments', 'комментариев': 'comments',
'комментарии': 'comments', 'комментариям': 'comments',
'comment': 'comments', 'comments': 'comments',
# Категории
'категория': 'categories', 'категории': 'categories', 'категорий': 'categories',
'категориям': 'categories', 'категориями': 'categories',
'category': 'categories', 'categories': 'categories',
# Сообщения
'сообщение': 'messages', 'сообщения': 'messages', 'сообщений': 'messages',
'сообщениям': 'messages', 'сообщениями': 'messages',
'message': 'messages', 'messages': 'messages',
# Файлы
'файл': 'files', 'файла': 'files', 'файлов': 'files', 'файлы': 'files',
'файлам': 'files', 'файлами': 'files',
'file': 'files', 'files': 'files',
# Задачи
'задача': 'tasks', 'задачи': 'tasks', 'задач': 'tasks',
'задачам': 'tasks', 'задачами': 'tasks',
'task': 'tasks', 'tasks': 'tasks',
# Логи
'лог': 'logs', 'лога': 'logs', 'логов': 'logs', 'логи': 'logs',
'логам': 'logs', 'логами': 'logs',
'log': 'logs', 'logs': 'logs',
# Настройки
'настройка': 'settings', 'настройки': 'settings', 'настроек': 'settings',
'настройкам': 'settings', 'настройками': 'settings',
'setting': 'settings', 'settings': 'settings',
}
# Словарь для преобразования русских названий полей в английские
FIELD_SYNONYMS = {
# ID
'ид': 'id', 'идентификатор': 'id', 'айди': 'id', 'номер': 'id',
# Имя
'имя': 'name', 'имени': 'name', 'именем': 'name', 'название': 'name',
'названия': 'name', 'названием': 'name', 'наименование': 'name',
# Фамилия
'фамилия': 'surname', 'фамилии': 'surname', 'фамилией': 'surname',
# Email
'почта': 'email', 'почты': 'email', 'почтой': 'email',
'мейл': 'email', 'мейла': 'email', 'мейлом': 'email',
'емейл': 'email', 'электронная почта': 'email',
# Телефон
'телефон': 'phone', 'телефона': 'phone', 'телефоном': 'phone',
'тел': 'phone', 'номер телефона': 'phone',
# Возраст
'возраст': 'age', 'возраста': 'age', 'возрастом': 'age', 'лет': 'age',
# Должность
'должность': 'position', 'должности': 'position', 'должностью': 'position',
'позиция': 'position', 'позиции': 'position', 'позицией': 'position',
# Статус
'статус': 'status', 'статуса': 'status', 'статусом': 'status',
'состояние': 'status', 'состояния': 'status',
# Дата
'дата': 'date', 'даты': 'date', 'датой': 'date',
'дата создания': 'created_at', 'дата обновления': 'updated_at',
# Цена
'цена': 'price', 'цены': 'price', 'ценой': 'price',
'стоимость': 'price', 'стоимости': 'price', 'стоимостью': 'price',
# Количество
'количество': 'quantity', 'количества': 'quantity', 'количеством': 'quantity',
'кол-во': 'quantity', 'штук': 'quantity',
# Описание
'описание': 'description', 'описания': 'description', 'описанием': 'description',
# Адрес
'адрес': 'address', 'адреса': 'address', 'адресом': 'address',
# Город
'город': 'city', 'города': 'city', 'городом': 'city',
# Страна
'страна': 'country', 'страны': 'country', 'страной': 'country',
# Пароль
'пароль': 'password', 'пароля': 'password', 'паролем': 'password',
# Роль
'роль': 'role', 'роли': 'role', 'ролью': 'role',
# Тип
'тип': 'type', 'типа': 'type', 'типом': 'type',
# Заголовок
'заголовок': 'title', 'заголовка': 'title', 'заголовком': 'title',
# Текст/Содержимое
'текст': 'text', 'текста': 'text', 'текстом': 'text',
'содержимое': 'content', 'содержимого': 'content', 'содержимым': 'content',
# Активен
'активен': 'active', 'активный': 'active', 'активна': 'active',
}
# Операторы сравнения на русском
COMPARISON_OPERATORS = {
'равно': '=', 'равен': '=', 'равна': '=', 'является': '=', '=': '=', '==': '=',
'не равно': '!=', 'не равен': '!=', 'не равна': '!=', '!=': '!=', '<>': '!=',
'больше': '>', 'больше чем': '>', '>': '>',
'меньше': '<', 'меньше чем': '<', '<': '<',
'больше или равно': '>=', 'не меньше': '>=', '>=': '>=',
'меньше или равно': '<=', 'не больше': '<=', '<=': '<=',
'содержит': 'LIKE', 'включает': 'LIKE',
'начинается с': 'STARTS', 'начинается на': 'STARTS',
'заканчивается на': 'ENDS', 'оканчивается на': 'ENDS',
'в списке': 'IN', 'один из': 'IN', 'входит в': 'IN',
'не в списке': 'NOT IN', 'не входит в': 'NOT IN',
'между': 'BETWEEN',
}
def _try_to_int(value):
"""Пытается преобразовать значение в число"""
if value is None:
return None
if isinstance(value, (int, float)):
return value
try:
# Пробуем int
return int(value)
except (ValueError, TypeError):
try:
# Пробуем float
return float(value)
except (ValueError, TypeError):
# Возвращаем строку
return str(value).strip() if value else value
def _normalize_value(value: str):
"""Нормализует значение: убирает кавычки, пробуется конвертировать в число"""
if not value:
return None
value = str(value).strip()
# Убираем кавычки
if (value.startswith('"') and value.endswith('"')) or \
(value.startswith("'") and value.endswith("'")):
value = value[1:-1]
# Преобразование булевых значений
lower_val = value.lower()
if lower_val in ('true', 'да', 'yes', 'истина', '1'):
return True
if lower_val in ('false', 'нет', 'no', 'ложь', '0'):
return False
if lower_val in ('null', 'none', 'nil', 'пусто'):
return None
return _try_to_int(value)
def _is_latin(text: str) -> bool:
"""Проверяет, состоит ли текст только из латинских букв, цифр и подчёркивания"""
return bool(re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', text))
def _normalize_field_name(field: str) -> str:
"""
Нормализует название поля.
- Если поле на латинице (age, name, etc.) - оставляет как есть
- Если поле на кириллице и есть в FIELD_SYNONYMS - НЕ нормализует (для MongoDB)
- Кириллические поля сохраняются для работы с MongoDB коллекциями
"""
if not field:
return field
field = field.strip()
# Если поле на латинице - возвращаем как есть
if _is_latin(field):
return field
# Кириллические поля - оставляем как есть для MongoDB
# Это позволяет работать с коллекциями с полями типа "имя", "возраст"
return field
def _normalize_table_name(table: str) -> str:
"""
Нормализует название таблицы.
- Если таблица на латинице (users, products) - оставляет как есть
- Если таблица на кириллице (Сотрудники) - оставляет как есть для MongoDB
"""
if not table:
return table
table = table.strip()
# Если таблица на латинице - возвращаем как есть
if _is_latin(table):
return table
# Кириллические таблицы - оставляем как есть для MongoDB
return table
def _parse_key_value_pairs(text_blob: str) -> dict:
"""
Парсит строку с парами ключ-значение в словарь.
Поддерживает форматы:
- ключ:значение
- ключ=значение
- ключ 'значение'
- ключ "значение"
- ключ значение (без кавычек, если значение одно слово)
"""
if not text_blob:
return {}
pairs = {}
# Паттерн 1: ключ:значение или ключ=значение (с кавычками или без)
pattern1 = rf"({ID_PATTERN})\s*[:=]\s*(?:'([^']*)'|\"([^\"]*)\"|([^\s,]+))"
for match in re.finditer(pattern1, text_blob, re.IGNORECASE):
key = match.group(1).strip()
value = match.group(2) or match.group(3) or match.group(4)
pairs[_normalize_field_name(key)] = _normalize_value(value)
# Паттерн 2: ключ 'значение' или ключ "значение" (с пробелом вместо :=)
if not pairs:
pattern2 = rf"({ID_PATTERN})\s+['\"]([^'\"]+)['\"]"
for match in re.finditer(pattern2, text_blob, re.IGNORECASE):
key = match.group(1).strip()
value = match.group(2)
pairs[_normalize_field_name(key)] = _normalize_value(value)
return pairs
def _parse_mongo_condition(condition_str: str) -> dict:
"""Парсит MongoDB условия с поддержкой операторов ($gt, $lt, $in, $regex и т.д.)"""
if not condition_str or not condition_str.strip():
return {}
condition = {}
# Попробуем распарсить как JSON если возможно
try:
parsed = json.loads(condition_str)
if isinstance(parsed, dict):
return parsed
except:
pass
# Парсим сложные условия на русском языке
# Формат: "поле оператор значение"
# Операторы с их MongoDB эквивалентами
operators_patterns = [
(r'больше или равно', '$gte'),
(r'меньше или равно', '$lte'),
(r'больше чем', '$gt'),
(r'меньше чем', '$lt'),
(r'не равно', '$ne'),
(r'начинается с', '$regex_start'),
(r'заканчивается на', '$regex_end'),
(r'содержит', '$regex_contains'),
(r'в списке', '$in'),
(r'между', '$between'),
]
for pattern, mongo_op in operators_patterns:
regex = rf'({ID_PATTERN})\s+{pattern}\s+(.+?)(?:\s+и\s+|$|,)'
match = re.search(regex, condition_str, re.IGNORECASE)
if match:
field = _normalize_field_name(match.group(1))
value = match.group(2).strip().strip("'\"")
if mongo_op == '$regex_start':
condition[field] = {'$regex': f'^{value}', '$options': 'i'}
elif mongo_op == '$regex_end':
condition[field] = {'$regex': f'{value}$', '$options': 'i'}
elif mongo_op == '$regex_contains':
condition[field] = {'$regex': value, '$options': 'i'}
elif mongo_op == '$in':
# Парсим список значений
values = [_normalize_value(v.strip()) for v in value.split(',')]
condition[field] = {'$in': values}
elif mongo_op == '$between':
# Парсим диапазон "X и Y"
range_match = re.match(r'(.+?)\s+и\s+(.+)', value)
if range_match:
low = _normalize_value(range_match.group(1))
high = _normalize_value(range_match.group(2))
condition[field] = {'$gte': low, '$lte': high}
else:
condition[field] = {mongo_op: _normalize_value(value)}
# Если ничего не нашли, пробуем простое равенство
if not condition:
# Формат: ключ:значение или ключ=значение
condition = _parse_key_value_pairs(condition_str)
# Если всё ещё пусто, пробуем формат "ключ значение"
if not condition:
simple_match = re.match(rf'({ID_PATTERN})\s+([^\s,]+)', condition_str.strip())
if simple_match:
key = _normalize_field_name(simple_match.group(1))
value = _normalize_value(simple_match.group(2))
condition[key] = value
return condition
def _extract_table_name(query_string: str) -> str:
"""
Извлекает название таблицы/коллекции из запроса.
Поддерживает множество форматов указания таблицы.
"""
patterns = [
# Явные указания с кавычками
r"\b(?:из|в|от|для|к|на)\s+(?:таблиц[ыуе]|коллекци[июей]|базы?|набора?)\s+['\"]([^'\"]+)['\"]",
r"\b(?:table|collection|from|into|to)\s+['\"]([^'\"]+)['\"]",
# Явные указания без кавычек
rf"\b(?:из|в|от|для|к|на)\s+(?:таблиц[ыуе]|коллекци[июей])\s+({ID_PATTERN})",
rf"\b(?:table|collection|from|into|to)\s+({ID_PATTERN})",
# Неявные указания (после "в" или "из") - требуем границу слова
rf"\b(?:из|в)\s+({ID_PATTERN})",
]
for pattern in patterns:
match = re.search(pattern, query_string, re.IGNORECASE)
if match:
table = match.group(1).strip()
# Проверяем, что это не служебное слово
service_words = {'все', 'всех', 'каждый', 'каждого', 'где', 'который', 'которого',
'который', 'table', 'collection', 'database', 'all', 'each', 'where'}
if table.lower() not in service_words:
return _normalize_table_name(table)
return None
def _extract_condition(query_string: str) -> dict:
"""
Извлекает условие из запроса.
Поддерживает множество форматов:
- где поле=значение
- где поле:значение
- где поле значение
- где поле равно значению
- где поле больше значения
- где поле > значение
- с полем значение
- у которого поле значение
- у которых поле значение
- where field=value
"""
condition = {}
# Ищем условие после "где", "where", "с", "у которого/которых"
condition_patterns = [
r"(?:где|where|с\s+условием|у\s+которого|у\s+которой|у\s+которых|который|которые|у\s+кого)\s+(.+?)(?:\s+установи|\s+обнови|\s+измени|\s+поменяй|\s+set|$)",
r"(?:если|when|if)\s+(.+?)(?:\s+то|then|$)",
]
condition_str = None
for pattern in condition_patterns:
match = re.search(pattern, query_string, re.IGNORECASE)
if match:
condition_str = match.group(1).strip()
break
if not condition_str:
return {}
# Убираем часть после "установи", "set" и т.д.
for stopper in ['установи', 'обнови', 'измени', 'поменяй', 'set', 'update']:
if stopper in condition_str.lower():
condition_str = condition_str.split(stopper)[0].strip()
# Парсим операторы сравнения на русском
comparison_patterns = [
(rf'({ID_PATTERN})\s+больше\s+или\s+равно\s+(\S+)', '$gte'),
(rf'({ID_PATTERN})\s+меньше\s+или\s+равно\s+(\S+)', '$lte'),
(rf'({ID_PATTERN})\s+не\s+равно?\s+(\S+)', '$ne'),
(rf'({ID_PATTERN})\s+больше\s+(?:чем\s+)?(\S+)', '$gt'),
(rf'({ID_PATTERN})\s+меньше\s+(?:чем\s+)?(\S+)', '$lt'),
(rf'({ID_PATTERN})\s+равн[оа]?\s+(\S+)', '$eq'),
(rf'({ID_PATTERN})\s+содержит\s+(\S+)', '$regex'),
(rf'({ID_PATTERN})\s+начинается\s+(?:с|на)\s+(\S+)', '$regex_start'),
(rf'({ID_PATTERN})\s+заканчивается\s+на\s+(\S+)', '$regex_end'),
# Символьные операторы
(rf'({ID_PATTERN})\s*>=\s*(\S+)', '$gte'),
(rf'({ID_PATTERN})\s*<=\s*(\S+)', '$lte'),
(rf'({ID_PATTERN})\s*!=\s*(\S+)', '$ne'),
(rf'({ID_PATTERN})\s*<>\s*(\S+)', '$ne'),
(rf'({ID_PATTERN})\s*>\s*(\S+)', '$gt'),
(rf'({ID_PATTERN})\s*<\s*(\S+)', '$lt'),
(rf'({ID_PATTERN})\s*=\s*(\S+)', '$eq'),
]
for pattern, mongo_op in comparison_patterns:
match = re.search(pattern, condition_str, re.IGNORECASE)
if match:
field = _normalize_field_name(match.group(1))
value = _normalize_value(match.group(2).strip("'\""))
if mongo_op == '$eq':
condition[field] = value
elif mongo_op == '$regex':
condition[field] = {'$regex': str(value), '$options': 'i'}
elif mongo_op == '$regex_start':
condition[field] = {'$regex': f'^{value}', '$options': 'i'}
elif mongo_op == '$regex_end':
condition[field] = {'$regex': f'{value}$', '$options': 'i'}
else:
condition[field] = {mongo_op: value}
return condition
# Парсим ключ:значение или ключ=значение
if not condition:
condition = _parse_key_value_pairs(condition_str)
# Простой формат "ключ значение"
if not condition:
simple_match = re.match(rf'({ID_PATTERN})\s+([^\s,]+)', condition_str)
if simple_match:
key = _normalize_field_name(simple_match.group(1))
value = _normalize_value(simple_match.group(2))
condition[key] = value
return condition
def _extract_data(query_string: str) -> dict:
"""
Извлекает данные для INSERT/UPDATE из запроса.
Поддерживает форматы:
- установи поле значение
- set поле=значение
- поменяй поле на значение
- данные: поле1=значение1, поле2=значение2
- с полем 'значение'
"""
data = {}
# Паттерны для извлечения данных (берём до "где"/"where" если они есть)
data_patterns = [
# После "установи", "set", "поменяй на", "измени на" - до "где"/"where" или конца строки
r"(?:установи|set|поменяй\s+на|измени\s+на|замени\s+на)\s+(.+?)(?:\s+где\s+|\s+where\s+|$)",
r"(?:данные|data|values|значения)\s*:\s*(.+?)(?:\s+где\s+|\s+where\s+|$)",
r"запись\s*:\s*(.+?)(?:\s+где\s+|\s+where\s+|$)",
]
data_str = None
for pattern in data_patterns:
match = re.search(pattern, query_string, re.IGNORECASE)
if match:
data_str = match.group(1).strip()
break
if data_str:
# Пробуем парсить как ключ:значение или ключ=значение
data = _parse_key_value_pairs(data_str)
# Если не получилось, пробуем формат "ключ значение"
if not data:
simple_match = re.match(rf'({ID_PATTERN})\s+([^\s,]+)', data_str)
if simple_match:
key = _normalize_field_name(simple_match.group(1))
value = _normalize_value(simple_match.group(2))
data[key] = value
# Также ищем формат "с ключом 'значение'" или "с ключом значение"
if not data:
with_pattern = rf"с\s+({ID_PATTERN})\s+['\"]?([^'\"]+)['\"]?"
for match in re.finditer(with_pattern, query_string, re.IGNORECASE):
key = _normalize_field_name(match.group(1))
value = _normalize_value(match.group(2))
data[key] = value
return data
def _parse_select(query_string: str) -> dict:
"""
Парсит запрос на выборку данных.
Поддерживает множество форматов:
- "Покажи все сотрудники"
- "Выведи пользователей"
- "Найди в таблице users где id=5"
- "Дай мне все записи из коллекции products"
- "все товары"
- "сотрудники где возраст больше 25"
- "users"
- "найди сотрудников возраст больше 30"
- "первые 10 товаров"
- "клиенты у которых статус активен"
"""
query_lower = query_string.lower()
# Извлекаем таблицу
table = _extract_table_name(query_string)
# Если таблица не найдена, пытаемся определить из контекста
if not table:
# Ищем существительное после ключевых слов
for keyword in SELECT_KEYWORDS:
# Паттерн: keyword [все] [мне] [из] существительное
pattern = rf'{keyword}\s+(?:\d+\s+)?(?:все\s+)?(?:мне\s+)?(?:всех\s+)?(?:перв\w+\s+)?(?:из\s+)?({ID_PATTERN})'
match = re.search(pattern, query_string, re.IGNORECASE)
if match:
word = match.group(1).lower()
word_end = match.end(1)
remaining = query_string[word_end:].strip().lower()
service_words = {'все', 'всё', 'из', 'в', 'где', 'записи', 'данные',
'документы', 'элементы', 'мне', 'нам', 'это', 'таблицы', 'коллекции',
'первые', 'первый', 'топ', 'лимит'}
# Если слово есть в TABLE_SYNONYMS - это точно таблица
if word in TABLE_SYNONYMS:
table = TABLE_SYNONYMS[word]
break
# Проверяем, не является ли слово полем с оператором сравнения после него
is_field_with_operator = (
word in FIELD_SYNONYMS and
(remaining.startswith('больше') or remaining.startswith('меньше') or
remaining.startswith('равн') or remaining.startswith('>') or
remaining.startswith('<') or remaining.startswith('='))
)
if word not in service_words and not is_field_with_operator:
table = _normalize_table_name(word)
break
# Ищем таблицу после "первые N", "первых N" или "топ N"
if not table:
first_n_match = re.search(rf'(?:перв\w*|топ|top)\s+(\d+)\s+({ID_PATTERN})', query_string, re.IGNORECASE)
if first_n_match:
word = first_n_match.group(2).lower()
if word in TABLE_SYNONYMS:
table = TABLE_SYNONYMS[word]
else:
table = _normalize_table_name(word)
# Ищем таблицу после "все" или "всех"
if not table:
all_match = re.search(rf'все[хй]?\s+({ID_PATTERN})', query_string, re.IGNORECASE)
if all_match:
word = all_match.group(1).lower()
# Если слово есть в TABLE_SYNONYMS - это точно таблица
if word in TABLE_SYNONYMS:
table = TABLE_SYNONYMS[word]
elif word not in {'записи', 'данные', 'документы', 'элементы'}:
table = _normalize_table_name(word)
# Ищем таблицу в начале запроса (например, "users" или "товары цена больше 1000")
if not table:
first_word_match = re.match(rf'^({ID_PATTERN})\s*', query_string, re.IGNORECASE)
if first_word_match:
word = first_word_match.group(1).lower()
service_words = {'все', 'всё', 'покажи', 'выведи', 'найди', 'дай', 'получи', 'выбери', 'обнови', 'удали', 'добавь', 'вставь', 'создай'}
# Если слово - таблица из TABLE_SYNONYMS
if word in TABLE_SYNONYMS:
table = TABLE_SYNONYMS[word]
elif word not in service_words and word not in FIELD_SYNONYMS:
table = _normalize_table_name(word)
# Колонки
columns = ['*']
columns_patterns = [
r"(?:покажи|выведи|выбери|дай|верни)\s+(.+?)\s+\b(?:из|в|от|для)\b",
r"(?:только|поля|колонки)\s+(.+?)\s+\b(?:из|в)\b",
]
for pattern in columns_patterns:
match = re.search(pattern, query_string, re.IGNORECASE)
if match:
cols_str = match.group(1).strip()
skip_words = {'все', 'всё', '*', 'всех', 'мне', 'мне все', 'первые', 'топ', 'первый'}
# Проверяем не содержит ли cols_str только служебные слова или числа
words_in_cols = cols_str.lower().split()
is_service = all(w in skip_words or w.isdigit() for w in words_in_cols)
if not is_service:
columns = [col.strip() for col in cols_str.split(',')]
columns = [_normalize_field_name(c) for c in columns if c and c.lower() not in skip_words and not c.isdigit()]
if not columns:
columns = ['*']
break
# Условие - также ищем без "где" если есть операторы сравнения
condition = _extract_condition(query_string)
# Если нет условия через "где", ищем прямые операторы сравнения после существительного
if not condition:
# Паттерн: таблица + поле + оператор + значение
direct_patterns = [
(rf'{ID_PATTERN}\s+({ID_PATTERN})\s+больше\s+(?:чем\s+)?(\S+)', '$gt'),
(rf'{ID_PATTERN}\s+({ID_PATTERN})\s+меньше\s+(?:чем\s+)?(\S+)', '$lt'),
(rf'{ID_PATTERN}\s+({ID_PATTERN})\s+равн[оа]?\s+(\S+)', '$eq'),
(rf'{ID_PATTERN}\s+({ID_PATTERN})\s*>\s*(\S+)', '$gt'),
(rf'{ID_PATTERN}\s+({ID_PATTERN})\s*<\s*(\S+)', '$lt'),
(rf'{ID_PATTERN}\s+({ID_PATTERN})\s*=\s*(\S+)', '$eq'),
]
for pattern, mongo_op in direct_patterns:
match = re.search(pattern, query_string, re.IGNORECASE)
if match:
field = _normalize_field_name(match.group(1))
value = _normalize_value(match.group(2).strip("'\""))
if mongo_op == '$eq':
condition[field] = value
else:
condition[field] = {mongo_op: value}
break
# Если всё ещё нет условия, ищем простое "поле значение" после таблицы (подразумевается равенство)
if not condition and table:
# Паттерн: таблица поле значение (без оператора)
# Ищем последовательность: слово_таблицы поле значение
simple_eq_pattern = rf'({ID_PATTERN})\s+({ID_PATTERN})\s+(\S+)$'
match = re.search(simple_eq_pattern, query_string.strip(), re.IGNORECASE)
if match:
potential_table = match.group(1).lower()
field_word = match.group(2).lower()
value = match.group(3)
# Проверяем, что первое слово - это таблица, второе - поле
if (potential_table in TABLE_SYNONYMS or potential_table == table.lower()) and field_word in FIELD_SYNONYMS:
field = _normalize_field_name(field_word)
condition[field] = _normalize_value(value.strip("'\""))
# Лимит
limit = None
# Паттерн: "первые N", "топ N", "limit N", "N первых"
limit_match = re.search(r'(?:лимит|limit|первые?|топ|top)\s+(\d+)', query_string, re.IGNORECASE)
if limit_match:
limit = int(limit_match.group(1))
else:
# Паттерн: "N первых", "5 первых"
limit_match2 = re.search(r'(\d+)\s+перв\w*', query_string, re.IGNORECASE)
if limit_match2:
limit = int(limit_match2.group(1))
# Сортировка
sort = None
sort_patterns = [
rf'(?:сортировка|sort|order)\s+(?:по|by)\s+({ID_PATTERN})\s*(asc|desc|возрастанию?|убыванию?)?',
rf'(?:по\s+(?:порядку|алфавиту))\s+(?:по)?\s*({ID_PATTERN})?',
]
for pattern in sort_patterns:
sort_match = re.search(pattern, query_string, re.IGNORECASE)
if sort_match:
field = _normalize_field_name(sort_match.group(1)) if sort_match.group(1) else 'id'
direction = 1
if sort_match.lastindex >= 2 and sort_match.group(2):
dir_str = sort_match.group(2).lower()
if dir_str in ('desc', 'убыванию', 'убывания'):
direction = -1
sort = [(field, direction)]
break
result = {
'operation': 'SELECT',
'table': table,
'columns': columns,
'condition': condition
}
if limit:
result['limit'] = limit
if sort:
result['sort'] = sort
return result
def _parse_insert(query_string: str) -> dict:
"""
Парсит запрос на добавление данных.
Поддерживает форматы:
- "Добавь в таблицу 'users' запись: name='Иван', age=25"
- "Вставь пользователя с именем Иван"
- "Создай нового сотрудника имя Петр должность инженер"
- "Добавь в коллекцию products товар с названием Телефон"
"""
# Извлекаем таблицу
table = _extract_table_name(query_string)
# Если не найдена, ищем по контексту
if not table:
for keyword in INSERT_KEYWORDS:
pattern = rf'{keyword}\s+(?:нового?|новую)?\s*({ID_PATTERN})'
match = re.search(pattern, query_string, re.IGNORECASE)
if match:
word = match.group(1).lower()
service_words = {'в', 'запись', 'данные', 'документ', 'элемент', 'новый', 'новую', 'нового'}
if word not in service_words:
table = _normalize_table_name(word)
break
# Извлекаем данные
data = _extract_data(query_string)
# Ищем данные в формате "с чем-то 'значение'" или "поле:значение"
if not data:
data = _parse_key_value_pairs(query_string)
# Ищем формат "имя Иван возраст 25" (без разделителей)
if not data:
# Паттерн для последовательных пар "слово значение"
pairs_pattern = rf'({ID_PATTERN})\s+([^\s]+)'
potential_pairs = re.findall(pairs_pattern, query_string, re.IGNORECASE)
for key, value in potential_pairs:
key_lower = key.lower()
# Проверяем, что это похоже на название поля
if key_lower in FIELD_SYNONYMS or key_lower not in {'добавь', 'вставь', 'создай', 'в', 'из', 'с', 'и', 'на', 'новый', 'нового', 'новую'}:
if key_lower not in {'добавь', 'вставь', 'создай', 'в', 'из', 'с', 'и', 'на', 'новый', 'нового', 'новую', 'запись', 'данные'}:
data[_normalize_field_name(key)] = _normalize_value(value)
return {
'operation': 'INSERT',
'table': table,
'data': data
}
def _parse_update(query_string: str) -> dict:
"""
Парсит запрос на обновление данных.
Поддерживает форматы:
- "UPDATE users SET name='Иван' WHERE id=5"
- "Обнови в таблице 'users' где id=5 установи name='Иван'"
- "Измени пользователя где имя Петр поменяй имя на Антон"
- "Обнови документ в Сотрудники где имя Петр установи имя Антон"
- "обнови Сотрудники установи имя=Антон где имя=Петр3"
"""
query_lower = query_string.lower()
# Проверяем SQL формат: UPDATE table SET ... WHERE ...
sql_match = re.match(
rf'update\s+({ID_PATTERN})\s+set\s+(.+?)(?:\s+where\s+(.+))?$',
query_string,
re.IGNORECASE
)
if sql_match:
table = sql_match.group(1)
set_part = sql_match.group(2).strip()
where_part = sql_match.group(3).strip() if sql_match.group(3) else None
# Парсим SET часть
data = _parse_key_value_pairs(set_part)
# Парсим WHERE часть
condition = {}
if where_part:
condition = _parse_key_value_pairs(where_part)
return {
'operation': 'UPDATE',
'table': table,
'data': data,
'condition': condition
}
# Извлекаем таблицу
table = _extract_table_name(query_string)
# Если не найдена, ищем по контексту
if not table:
for keyword in UPDATE_KEYWORDS:
pattern = rf'{keyword}\s+(?:документ\s+)?(?:запись\s+)?(?:в\s+)?({ID_PATTERN})'
match = re.search(pattern, query_string, re.IGNORECASE)
if match:
word = match.group(1).lower()
service_words = {'в', 'документ', 'запись', 'данные', 'где', 'поле', 'установи', 'set'}
if word not in service_words:
table = _normalize_table_name(word)
break
# Извлекаем условие - для UPDATE важен порядок: условие может быть ДО или ПОСЛЕ данных
condition = _extract_condition(query_string)
# Извлекаем данные для обновления
data = _extract_data(query_string)
# Дополнительный поиск данных если пусто
if not data:
# Формат "поменяй X на Y" или "замени X на Y"
change_patterns = [
rf'(?:поменяй|замени|измени)\s+({ID_PATTERN})\s+(?:на|в)\s+([^\s,]+)',
]
for pattern in change_patterns:
match = re.search(pattern, query_string, re.IGNORECASE)
if match:
key = _normalize_field_name(match.group(1))
value = _normalize_value(match.group(2))
data[key] = value
break
# Убедимся, что данные для обновления не содержат поля из условия с теми же значениями
# (это могло произойти из-за неправильного парсинга)
if data and condition:
for key in list(data.keys()):
if key in condition and data[key] == condition[key]:
del data[key]
return {
'operation': 'UPDATE',
'table': table,
'data': data,
'condition': condition
}
def _parse_delete(query_string: str) -> dict:
"""
Парсит запрос на удаление данных.
Поддерживает форматы:
- "Удали из таблицы 'users' где id=5"
- "Убери пользователя где имя Петр"
- "Удали из коллекции Сотрудники где имя Петр"
- "Очисти записи из orders где status='cancelled'"
- "delete from users where id=5"
"""
# Извлекаем таблицу
table = _extract_table_name(query_string)
# Если не найдена, ищем по контексту
if not table:
for keyword in DELETE_KEYWORDS:
pattern = rf'{keyword}\s+(?:из\s+)?({ID_PATTERN})'
match = re.search(pattern, query_string, re.IGNORECASE)
if match:
word = match.group(1).lower()
service_words = {'из', 'запись', 'записи', 'данные', 'документ', 'документы', 'где', 'все', 'всё'}
if word not in service_words:
table = _normalize_table_name(word)
break
# Извлекаем условие
condition = _extract_condition(query_string)
return {
'operation': 'DELETE',
'table': table,
'condition': condition
}
def _parse_create_table(query_string: str) -> dict:
"""
Парсит запрос на создание таблицы/коллекции.
Поддерживает форматы:
- "Создай таблицу 'users' с колонками: id INTEGER PRIMARY KEY, name TEXT"
- "Создай коллекцию products"
- "Новая таблица orders"
"""
# Ищем название таблицы
table = None
table_patterns = [
r"(?:таблиц[у|а]|коллекци[ю|я])\s+['\"]([^'\"]+)['\"]",
rf"(?:таблиц[уа]|коллекци[юя])\s+({ID_PATTERN})",
rf"(?:новая?|create)\s+(?:таблиц[уа]|коллекци[юя]|table|collection)\s+({ID_PATTERN})",
]
for pattern in table_patterns:
match = re.search(pattern, query_string, re.IGNORECASE)
if match:
table = match.group(1)
break
# Ищем определение колонок
columns_match = re.search(r"(?:с колонками|колонки|columns)\s*:?\s*(.*)", query_string, re.IGNORECASE)
columns_def = columns_match.group(1).strip() if columns_match else ""
if not table:
return {'error': 'Не удалось распознать название таблицы'}
# Если есть определение колонок, создаём SQL
if columns_def:
sql_query = f"CREATE TABLE {table} ({columns_def})"
return {'operation': 'RAW_SQL', 'query': sql_query}
# Иначе это MongoDB коллекция
return {'operation': 'CREATE_TABLE', 'table': table}
def _parse_create_database(query_string: str) -> dict:
"""
Парсит запрос на создание базы данных.
Поддерживает форматы:
- "Создай БД 'ТЕСТ_30'"
- "Создай базу данных test_db"
- "CREATE DATABASE mydb"
"""
# Паттерны для извлечения имени БД
db_patterns = [
r"(?:бд|база данных|базу данных|database|db)\s+['\"`]([^'\"`]+)['\"`]", # с кавычками
rf"(?:бд|база данных|базу данных|database|db)\s+({ID_PATTERN})", # без кавычек
r"(?:базу)\s+['\"`]([^'\"`]+)['\"`]", # "базу" с кавычками
rf"(?:базу)\s+({ID_PATTERN})", # "базу" без кавычек (если не "базу данных")
]
db_name = None
for pattern in db_patterns:
match = re.search(pattern, query_string, re.IGNORECASE)
if match:
candidate = match.group(1)
# Пропускаем "данных" как имя БД (это часть "базу данных")
if candidate.lower() != 'данных':
db_name = candidate
break
if not db_name:
return {'error': 'Не удалось распознать название базы данных'}
return {'operation': 'CREATE_DATABASE', 'database': db_name}
def _parse_drop_table(query_string: str) -> dict:
"""
Парсит запрос на удаление таблицы/коллекции.
Поддерживает форматы:
- "Удали таблицу 'users'"
- "Удали коллекцию products"
- "Drop table orders"
"""
table = None
table_patterns = [
r"(?:таблиц[уа]|коллекци[юя])\s+['\"]([^'\"]+)['\"]",
rf"(?:таблиц[уа]|коллекци[юя])\s+({ID_PATTERN})",
rf"(?:drop)\s+(?:table|collection)\s+({ID_PATTERN})",
]
for pattern in table_patterns:
match = re.search(pattern, query_string, re.IGNORECASE)
if match:
table = match.group(1)
break
if not table:
return {'error': 'Не удалось распознать название таблицы'}
return {'operation': 'DROP_TABLE', 'table': table}
def _parse_alter(query_string: str) -> dict:
"""
Парсит запрос на изменение структуры таблицы.
Поддерживает форматы:
- "Добавь столбец 'email' тип 'TEXT' в таблицу 'users'"
- "Удали столбец 'phone' из таблицы 'users'"
- "Добавь поле age в users"
- "Переименуй столбец name в full_name в таблице users"
"""
table = _extract_table_name(query_string)
query_lower = query_string.lower()
# Добавление столбца
if any(word in query_lower for word in ['добавь столбец', 'добавь поле', 'добавь колонку', 'add column']):
column_patterns = [
r"(?:столбец|поле|колонку)\s+['\"]([^'\"]+)['\"]",
rf"(?:столбец|поле|колонку)\s+({ID_PATTERN})",
]
column_name = None
for pattern in column_patterns:
match = re.search(pattern, query_string, re.IGNORECASE)
if match:
column_name = match.group(1)
break
# Тип столбца
type_match = re.search(r"(?:тип|type)\s+['\"]?(\w+)['\"]?", query_string, re.IGNORECASE)
column_type = type_match.group(1) if type_match else 'TEXT'
return {
'operation': 'ALTER',
'action': 'ADD_COLUMN',
'table': table,
'column_name': column_name,
'column_type': column_type
}
# Удаление столбца
elif any(word in query_lower for word in ['удали столбец', 'удали поле', 'удали колонку', 'drop column']):
column_patterns = [
r"(?:столбец|поле|колонку)\s+['\"]([^'\"]+)['\"]",
rf"(?:столбец|поле|колонку)\s+({ID_PATTERN})",
]
column_name = None
for pattern in column_patterns:
match = re.search(pattern, query_string, re.IGNORECASE)
if match:
column_name = match.group(1)
break
return {
'operation': 'ALTER',
'action': 'DROP_COLUMN',
'table': table,
'column_name': column_name
}
# Переименование столбца
elif any(word in query_lower for word in ['переименуй столбец', 'переименуй поле', 'rename column']):
rename_match = re.search(
rf"(?:столбец|поле)\s+({ID_PATTERN})\s+(?:в|на|to)\s+({ID_PATTERN})",
query_string, re.IGNORECASE
)
if rename_match:
return {
'operation': 'ALTER',
'action': 'RENAME_COLUMN',
'table': table,
'column_name': rename_match.group(1),
'new_column_name': rename_match.group(2)
}
return {'error': 'Не удалось распознать операцию ALTER'}
def _parse_mongo_shell_command(query_string: str) -> dict:
"""
Парсит MongoDB shell команды вида db.collection.operation()
"""
query = query_string.strip()
if query.startswith('db.'):
match = re.match(r'db\.(\w+)\.(\w+)\((.*)\)', query)
if match:
collection_name = match.group(1)
operation = match.group(2)
params_str = match.group(3).strip()
# Определяем тип операции
if operation in ['find', 'findOne']:
op_type = 'SELECT'
elif operation in ['insertOne', 'insertMany']:
op_type = 'INSERT'
elif operation in ['updateOne', 'updateMany', 'replaceOne']:
op_type = 'UPDATE'
elif operation in ['deleteOne', 'deleteMany']:
op_type = 'DELETE'
elif operation in ['countDocuments', 'distinct', 'aggregate']:
op_type = 'SELECT'
elif operation == 'drop':
op_type = 'DROP_TABLE'
elif operation == 'createIndex':
op_type = 'RAW_MONGO'
else:
op_type = 'RAW_MONGO'
result = {
'operation': op_type,
'table': collection_name,
'raw_mongo': query
}
if params_str:
try:
params = ast.literal_eval(f'({params_str})')
if isinstance(params, tuple):
if len(params) >= 1 and isinstance(params[0], dict):
result['condition'] = params[0]
if len(params) >= 2:
if op_type == 'SELECT' and isinstance(params[1], dict):
result['projection'] = params[1]
elif op_type == 'UPDATE' and isinstance(params[1], dict):
result['data'] = params[1]
elif isinstance(params, dict):
if op_type in ['SELECT', 'DELETE']:
result['condition'] = params
elif op_type == 'INSERT':
result['data'] = params
except:
result['raw_params'] = params_str
return result
# Дополнительные MongoDB команды
lower_query = query.lower()
if any(cmd in lower_query for cmd in ['getcollectionnames', 'listcollectionnames', 'show collections']):
return {'operation': 'RAW_MONGO', 'raw_mongo': query}
elif 'createcollection' in lower_query:
match = re.search(r'db\.createCollection\(\s*["\'](\w+)["\']', query)
if match:
return {'operation': 'CREATE_TABLE', 'table': match.group(1), 'raw_mongo': query}
elif '.drop()' in query and query.startswith('db.'):
match = re.search(r'db\.(\w+)\.drop\(\)', query)
if match:
return {'operation': 'DROP_TABLE', 'table': match.group(1), 'raw_mongo': query}
elif 'stats()' in lower_query:
return {'operation': 'RAW_MONGO', 'raw_mongo': query}
return None
def _parse_mongo_operations(query_string: str) -> dict:
"""Парсит MongoDB специфичные операции на естественном языке"""
query_lower = query_string.lower().strip()
# Создание коллекции
if any(phrase in query_lower for phrase in ['создай коллекцию', 'создать коллекцию', 'новая коллекция']):
table = _extract_table_name(query_string)
if not table:
match = re.search(rf"коллекци[юя]\s+['\"]?({ID_PATTERN})['\"]?", query_string, re.IGNORECASE)
if match:
table = match.group(1)
return {'operation': 'CREATE_TABLE', 'table': table}
# Удаление коллекции
if any(phrase in query_lower for phrase in ['удали коллекцию', 'удалить коллекцию', 'drop collection']):
table = _extract_table_name(query_string)
if not table:
match = re.search(rf"коллекци[юя]\s+['\"]?({ID_PATTERN})['\"]?", query_string, re.IGNORECASE)
if match:
table = match.group(1)
return {'operation': 'DROP_TABLE', 'table': table}
# Агрегация
if any(phrase in query_lower for phrase in ['агрегация', 'aggregate', 'группировка', 'group by']):
table = _extract_table_name(query_string)
return {'operation': 'RAW_MONGO', 'table': table, 'raw_mongo': query_string}
return None
def _parse_sql_select(query_string: str) -> dict:
"""
Парсит SQL SELECT запрос.
Поддерживает: SELECT * FROM table, SELECT col1, col2 FROM table WHERE condition
Поддерживает: ORDER BY, LIMIT, OFFSET, GROUP BY
"""
query = query_string.strip()
# Регулярное выражение для парсинга SELECT
match = re.match(
r'SELECT\s+(.+?)\s+FROM\s+["\']?({ID_PATTERN_UNICODE})["\']?'
r'(?:\s+WHERE\s+(.+?))?'
r'(?:\s+GROUP\s+BY\s+(.+?))?'
r'(?:\s+ORDER\s+BY\s+(.+?))?'
r'(?:\s+LIMIT\s+(\d+))?'
r'(?:\s+OFFSET\s+(\d+))?'
r'\s*$',
query,
re.IGNORECASE
)
if match:
columns_str = match.group(1).strip()
table = match.group(2).strip()
where_clause = match.group(3)
group_by = match.group(4)
order_by = match.group(5)
limit = match.group(6)
offset = match.group(7)
# Парсим колонки
if columns_str == '*':
columns = ['*']
else:
columns = [col.strip().strip('"\'') for col in columns_str.split(',')]
result = {
'operation': 'SELECT',
'table': table,
'columns': columns,
'condition': {}
}
if where_clause:
result['condition'] = _parse_sql_where(where_clause)
if limit:
result['limit'] = int(limit)
if offset:
result['offset'] = int(offset)
if order_by:
# Парсим ORDER BY
sort = []
for part in order_by.split(','):
part = part.strip()
if ' DESC' in part.upper():
field = part.upper().replace(' DESC', '').strip()
sort.append((field, -1))
elif ' ASC' in part.upper():
field = part.upper().replace(' ASC', '').strip()
sort.append((field, 1))
else:
sort.append((part, 1))
result['sort'] = sort
return result
return None
def _parse_sql_insert(query_string: str) -> dict:
"""
Парсит SQL INSERT запрос.
Поддерживает:
- INSERT INTO table (col1, col2) VALUES (val1, val2)
- INSERT INTO table (col1, col2) VALUES (val1, val2), (val3, val4), ... (bulk insert)
"""
query = query_string.strip()
# Проверяем на bulk INSERT (несколько групп VALUES)
# Если есть ), ( - это bulk insert, передаём как RAW_SQL
if re.search(r'\),\s*\(', query):
return {
'operation': 'RAW_SQL',
'query': query
}
match = re.match(
rf'INSERT\s+INTO\s+["\']?({ID_PATTERN_UNICODE})["\']?\s*\((.+?)\)\s*VALUES\s*\((.+?)\)',
query,
re.IGNORECASE
)
if match:
table = match.group(1).strip()
columns_str = match.group(2).strip()
values_str = match.group(3).strip()
columns = [col.strip().strip('"\'') for col in columns_str.split(',')]
# Парсим значения с учетом кавычек
values = []
in_quotes = False
quote_char = None
current_val = ''
for char in values_str + ',':
if char in ('"', "'") and not in_quotes:
in_quotes = True
quote_char = char
elif char == quote_char and in_quotes:
in_quotes = False
quote_char = None
elif char == ',' and not in_quotes:
val = current_val.strip().strip('"\'')
values.append(_normalize_value(val))
current_val = ''
continue
current_val += char
data = dict(zip(columns, values))
return {
'operation': 'INSERT',
'table': table,
'data': data
}
return None
def _parse_sql_update(query_string: str) -> dict:
"""
Парсит SQL UPDATE запрос.
Поддерживает: UPDATE table SET col1 = val1, col2 = val2 WHERE condition
Для сложных условий (IN, BETWEEN и т.д.) передаёт как RAW_SQL.
"""
query = query_string.strip()
# Проверяем на сложные условия которые лучше передать как RAW_SQL
query_upper = query.upper()
complex_conditions = [' IN ', ' IN(', ' NOT IN ', ' BETWEEN ', ' LIKE ', ' IS NULL', ' IS NOT NULL']
if any(cond in query_upper for cond in complex_conditions):
return {
'operation': 'RAW_SQL',
'query': query
}
match = re.match(
rf'UPDATE\s+["\']?({ID_PATTERN_UNICODE})["\']?\s+SET\s+(.+?)(?:\s+WHERE\s+(.+))?$',
query,
re.IGNORECASE
)
if match:
table = match.group(1).strip()
set_clause = match.group(2).strip()
where_clause = match.group(3)
# Парсим SET clause - поддержка кириллицы и других символов Unicode
data = {}
# Паттерн для поля=значение с поддержкой кириллицы
set_pattern = rf'["\']?({ID_PATTERN_UNICODE})["\']?\s*=\s*(?:\'([^\']*?)\'|"([^"]*?)"|([^\s,]+))'
for match_set in re.finditer(set_pattern, set_clause, re.UNICODE):
col = match_set.group(1)
# Берём первое не-None значение
val = match_set.group(2) or match_set.group(3) or match_set.group(4) or ''
data[col] = _normalize_value(val)
result = {
'operation': 'UPDATE',
'table': table,
'data': data,
'condition': {}
}
if where_clause:
result['condition'] = _parse_sql_where(where_clause)
return result
return None
def _parse_sql_delete(query_string: str) -> dict:
"""
Парсит SQL DELETE запрос.
Поддерживает: DELETE FROM table WHERE condition
Для сложных условий (IN, BETWEEN и т.д.) передаёт как RAW_SQL.
"""
query = query_string.strip()
# Проверяем на сложные условия которые лучше передать как RAW_SQL
query_upper = query.upper()
complex_conditions = [' IN ', ' IN(', ' NOT IN ', ' BETWEEN ', ' LIKE ', ' IS NULL', ' IS NOT NULL']
if any(cond in query_upper for cond in complex_conditions):
return {
'operation': 'RAW_SQL',
'query': query
}
match = re.match(
rf'DELETE\s+FROM\s+["\']?({ID_PATTERN_UNICODE})["\']?(?:\s+WHERE\s+(.+))?',
query,
re.IGNORECASE
)
if match:
table = match.group(1).strip()
where_clause = match.group(2)
result = {
'operation': 'DELETE',
'table': table,
'condition': {}
}
if where_clause:
result['condition'] = _parse_sql_where(where_clause)
return result
return None
def _parse_sql_where(where_clause: str) -> dict:
"""
Парсит SQL WHERE clause в MongoDB-совместимый формат.
Поддерживает: =, >, <, >=, <=, !=, <>, IN, LIKE, BETWEEN, IS NULL, IS NOT NULL
"""
if not where_clause:
return {}
condition = {}
where_clause = where_clause.strip()
# Паттерн для идентификатора с поддержкой Unicode (кириллица и т.д.)
ID_PAT = ID_PATTERN_UNICODE
# Паттерн для значения: строка в кавычках, число или слово с Unicode
VAL_PAT = r'(?:\'([^\']*?)\'|"([^"]*?)"|([^\s,\)]+))'
# Операторы сравнения с поддержкой Unicode
operators = [
(rf'({ID_PAT})\s*>=\s*{VAL_PAT}', '$gte'),
(rf'({ID_PAT})\s*<=\s*{VAL_PAT}', '$lte'),
(rf'({ID_PAT})\s*!=\s*{VAL_PAT}', '$ne'),
(rf'({ID_PAT})\s*<>\s*{VAL_PAT}', '$ne'),
(rf'({ID_PAT})\s*>\s*{VAL_PAT}', '$gt'),
(rf'({ID_PAT})\s*<\s*{VAL_PAT}', '$lt'),
(rf'({ID_PAT})\s*=\s*{VAL_PAT}', '$eq'),
]
# IS NULL / IS NOT NULL
null_match = re.search(rf'({ID_PAT})\s+IS\s+NOT\s+NULL', where_clause, re.IGNORECASE)
if null_match:
condition[null_match.group(1)] = {'$ne': None}
where_clause = re.sub(rf'({ID_PAT})\s+IS\s+NOT\s+NULL', '', where_clause, flags=re.IGNORECASE)
null_match = re.search(rf'({ID_PAT})\s+IS\s+NULL', where_clause, re.IGNORECASE)
if null_match:
condition[null_match.group(1)] = None
where_clause = re.sub(rf'({ID_PAT})\s+IS\s+NULL', '', where_clause, flags=re.IGNORECASE)
# BETWEEN
between_match = re.search(rf'({ID_PAT})\s+BETWEEN\s+(\d+)\s+AND\s+(\d+)', where_clause, re.IGNORECASE)
if between_match:
field = between_match.group(1)
low = _normalize_value(between_match.group(2))
high = _normalize_value(between_match.group(3))
condition[field] = {'$gte': low, '$lte': high}
where_clause = re.sub(rf'({ID_PAT})\s+BETWEEN\s+\d+\s+AND\s+\d+', '', where_clause, flags=re.IGNORECASE)
# LIKE
like_match = re.search(rf'({ID_PAT})\s+LIKE\s+\'([^\']+)\'', where_clause, re.IGNORECASE)
if like_match:
field = like_match.group(1)
pattern = like_match.group(2)
regex_pattern = pattern.replace('%', '.*').replace('_', '.')
if pattern.startswith('%') and pattern.endswith('%'):
condition[field] = {'$regex': regex_pattern[2:-2], '$options': 'i'}
elif pattern.startswith('%'):
condition[field] = {'$regex': regex_pattern[2:] + '$', '$options': 'i'}
elif pattern.endswith('%'):
condition[field] = {'$regex': '^' + regex_pattern[:-2], '$options': 'i'}
else:
condition[field] = {'$regex': regex_pattern, '$options': 'i'}
where_clause = re.sub(rf'({ID_PAT})\s+LIKE\s+\'[^\']+\'', '', where_clause, flags=re.IGNORECASE)
# IN
in_match = re.search(rf'({ID_PAT})\s+IN\s*\(([^)]+)\)', where_clause, re.IGNORECASE)
if in_match:
field = in_match.group(1)
values_str = in_match.group(2)
values = [_normalize_value(v.strip().strip('"\'')) for v in values_str.split(',')]
condition[field] = {'$in': values}
where_clause = re.sub(rf'({ID_PAT})\s+IN\s*\([^)]+\)', '', where_clause, flags=re.IGNORECASE)
# NOT IN
not_in_match = re.search(rf'({ID_PAT})\s+NOT\s+IN\s*\(([^)]+)\)', where_clause, re.IGNORECASE)
if not_in_match:
field = not_in_match.group(1)
values_str = not_in_match.group(2)
values = [_normalize_value(v.strip().strip('"\'')) for v in values_str.split(',')]
condition[field] = {'$nin': values}
where_clause = re.sub(rf'({ID_PAT})\s+NOT\s+IN\s*\([^)]+\)', '', where_clause, flags=re.IGNORECASE)
# Остальные операторы
for pattern, mongo_op in operators:
for match in re.finditer(pattern, where_clause, re.IGNORECASE):
field = match.group(1)
# Берём первое не-None значение из групп 2, 3, 4
val = match.group(2) or match.group(3) or match.group(4)
val = _normalize_value(val)
if mongo_op == '$eq':
condition[field] = val
else:
condition[field] = {mongo_op: val}
return condition
return condition
def _detect_operation_type(query_string: str) -> str:
"""
Определяет тип операции по ключевым словам в запросе.
Возвращает: 'SELECT', 'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'DROP', 'ALTER', 'UNKNOWN'
"""
query_lower = query_string.lower().strip()
# SQL команды
if query_lower.startswith('select'):
return 'SELECT'
if query_lower.startswith('insert'):
return 'INSERT'
if query_lower.startswith('update'):
return 'UPDATE'
if query_lower.startswith('delete'):
return 'DELETE'
if query_lower.startswith('create'):
return 'CREATE'
if query_lower.startswith('drop'):
return 'DROP'
if query_lower.startswith('alter'):
return 'ALTER'
# Естественный язык - SELECT
for keyword in SELECT_KEYWORDS:
if query_lower.startswith(keyword) or f' {keyword} ' in f' {query_lower} ':
return 'SELECT'
# Проверяем "все X" как SELECT
if re.match(r'^все\s+', query_lower):
return 'SELECT'
# Проверяем "первые N X" или "топ N X" как SELECT
if re.match(r'^(?:перв\w*|топ|top)\s+\d+', query_lower):
return 'SELECT'
# Проверяем "X у которых Y" или "X с Y" как SELECT
if re.search(r'у\s+котор\w+\s+', query_lower) or re.search(r'\s+с\s+\w+ом\s+', query_lower):
return 'SELECT'
# Проверяем "X где Y" как SELECT (таблица + условие)
if re.search(rf'^{ID_PATTERN}\s+где\s+', query_string, re.IGNORECASE):
return 'SELECT'
# Проверяем если запрос - просто имя таблицы
if re.match(rf'^{ID_PATTERN}$', query_string):
return 'SELECT'
# Проверяем: если первое слово - таблица из TABLE_SYNONYMS и есть условие/оператор, то SELECT
# Паттерн: таблица + поле + оператор/значение (например: "товары цена больше 1000")
first_word_match = re.match(rf'^({ID_PATTERN})\s+', query_string, re.IGNORECASE)
if first_word_match:
first_word = first_word_match.group(1).lower()
if first_word in TABLE_SYNONYMS:
# Проверяем есть ли что-то похожее на условие (поле+оператор или поле+значение)
rest = query_string[first_word_match.end():].strip().lower()
# Если после таблицы идёт поле из FIELD_SYNONYMS - это SELECT с условием
for field in FIELD_SYNONYMS:
if rest.startswith(field):
return 'SELECT'
# Если есть операторы сравнения - это SELECT
if any(op in rest for op in ['больше', 'меньше', 'равн', '>', '<', '=']):
return 'SELECT'
# CREATE DATABASE - проверяем ДО INSERT, т.к. "создай" есть в INSERT_KEYWORDS
if any(phrase in query_lower for phrase in ['создай бд', 'создать бд', 'создай базу', 'создать базу', 'новая база', 'создай db', 'создать db', 'create database']):
return 'CREATE_DATABASE'
# CREATE TABLE - проверяем ДО INSERT
if any(phrase in query_lower for phrase in ['создай таблиц', 'создать таблиц', 'создай коллекц', 'создать коллекц', 'новая таблица', 'новая коллекция']):
return 'CREATE'
# Естественный язык - INSERT
for keyword in INSERT_KEYWORDS:
if query_lower.startswith(keyword):
return 'INSERT'
# Естественный язык - UPDATE
for keyword in UPDATE_KEYWORDS:
if query_lower.startswith(keyword):
return 'UPDATE'
# Естественный язык - DELETE (но не таблицу/столбец)
for keyword in DELETE_KEYWORDS:
if query_lower.startswith(keyword):
# Проверяем, что это не DROP TABLE или ALTER
if 'таблиц' in query_lower or 'столбец' in query_lower or 'колонк' in query_lower or 'поле' in query_lower:
if 'столбец' in query_lower or 'колонк' in query_lower or 'поле' in query_lower:
return 'ALTER'
return 'DROP'
return 'DELETE'
return 'UNKNOWN'
def parse_query(query_string: str) -> dict:
"""
Универсальный парсер запросов.
Поддерживает:
- SQL запросы (SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, ALTER)
- MongoDB shell команды (db.collection.find(), etc.)
- Естественный язык на русском и английском
Примеры:
- "Покажи все сотрудники"
- "все пользователи где возраст больше 25"
- "Добавь пользователя имя Иван возраст 25"
- "Обнови в users где id=5 установи name Петр"
- "Удали из orders где status cancelled"
- "SELECT * FROM users WHERE age > 25"
- "db.users.find({age: {$gt: 25}})"
"""
if not query_string or not query_string.strip():
return {'error': 'Пустой запрос'}
query_string = query_string.strip()
query_lower = query_string.lower()
# 1. Проверяем специальные MongoDB команды
if query_lower in ['show collections', 'list collections', 'db.stats()']:
return {'operation': 'RAW_MONGO', 'raw_mongo': query_string}
# 2. MongoDB shell команды
if query_string.startswith('db.'):
result = _parse_mongo_shell_command(query_string)
if result:
return result
# 3. MongoDB операции на естественном языке
mongo_result = _parse_mongo_operations(query_string)
if mongo_result:
return mongo_result
# 4. Определяем тип операции
op_type = _detect_operation_type(query_string)
# 5. Парсим в зависимости от типа операции
# SQL SELECT
if query_lower.startswith('select'):
result = _parse_sql_select(query_string)
if result:
return result
# SQL INSERT
if query_lower.startswith('insert'):
result = _parse_sql_insert(query_string)
if result:
return result
# SQL UPDATE
if query_lower.startswith('update'):
result = _parse_sql_update(query_string)
if result:
return result
# SQL DELETE
if query_lower.startswith('delete'):
result = _parse_sql_delete(query_string)
if result:
return result
# RAW SQL для CREATE, DROP, ALTER, PRAGMA, SHOW и других
sql_raw_keywords = ['create table', 'drop table', 'alter table', 'pragma', 'truncate', 'show tables', 'show databases', 'show columns', 'show index', 'show create', 'show status', 'show variables', 'describe ', 'desc ']
if any(query_lower.startswith(keyword) for keyword in sql_raw_keywords):
return {'operation': 'RAW_SQL', 'query': query_string}
# CREATE DATABASE / USE DATABASE
if query_lower.startswith('create database'):
match = re.search(r'create\s+database\s+[`"\']?([^`"\'\s;]+)[`"\']?', query_string, re.IGNORECASE)
if match:
return {'operation': 'CREATE_DATABASE', 'database': match.group(1)}
if query_lower.startswith('use '):
match = re.search(r'use\s+[`"\']?([^`"\'\s;]+)[`"\']?', query_string, re.IGNORECASE)
if match:
return {'operation': 'USE_DATABASE', 'database': match.group(1)}
# Естественный язык
if op_type == 'SELECT':
return _parse_select(query_string)
if op_type == 'INSERT':
return _parse_insert(query_string)
if op_type == 'UPDATE':
return _parse_update(query_string)
if op_type == 'DELETE':
return _parse_delete(query_string)
if op_type == 'CREATE':
return _parse_create_table(query_string)
if op_type == 'CREATE_DATABASE':
return _parse_create_database(query_string)
if op_type == 'DROP':
return _parse_drop_table(query_string)
if op_type == 'ALTER':
return _parse_alter(query_string)
# Последняя попытка - пробуем распознать как SELECT
# Если запрос содержит только название таблицы
if re.match(rf'^{ID_PATTERN}$', query_string):
return {
'operation': 'SELECT',
'table': _normalize_table_name(query_string),
'columns': ['*'],
'condition': {}
}
return {
'error': f'Не удалось распознать команду: "{query_string}"',
'hint': 'Используйте естественный язык (например: "покажи все пользователи") или SQL запрос (например: "SELECT * FROM users")'
}