# -*- coding: utf-8 -*-
import sys
import json
import os
import traceback
import signal
# Принудительно устанавливаем UTF-8 для Windows
# Это важно для корректной работы с кириллицей через MCP
os.environ['PYTHONIOENCODING'] = 'utf-8'
# Установляем кодировку UTF-8 для всех операций ввода-вывода
# ВАЖНО: stdin тоже должен быть UTF-8 для корректной обработки кириллицы
import io
# Оборачиваем stdin в UTF-8
if sys.stdin.encoding != 'utf-8':
sys.stdin = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8', errors='replace')
# Оборачиваем stdout в UTF-8
if sys.stdout.encoding != 'utf-8':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
# Оборачиваем stderr в UTF-8
if sys.stderr.encoding != 'utf-8':
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
# Добавляем родительскую директорию в путь для импортов
# Это необходимо когда скрипт запускается напрямую, а не как модуль
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Используем стандартные импорты
try:
from src.db_connector import DBConnector
from src.query_parser import parse_query
from src.config import DB_CONFIGS
except ImportError as e:
try:
print(f"[ERROR] Import error with src prefix: {e}", file=sys.stderr, flush=True)
except:
pass
# Пытаемся импортировать без префикса
try:
# Добавляем текущую директорию
current_dir = os.path.dirname(os.path.abspath(__file__))
if current_dir not in sys.path:
sys.path.insert(0, current_dir)
from db_connector import DBConnector
from query_parser import parse_query
from config import DB_CONFIGS
except ImportError as e2:
print(f"[CRITICAL] Import failed: {e2}", file=sys.stderr, flush=True)
sys.exit(1)
def safe_json_dumps(obj, **kwargs):
"""Безопасно преобразует объект в JSON с гарантией правильной кодировки"""
# Устанавливаем параметры по умолчанию
# Используем ensure_ascii=True для максимальной совместимости
kwargs.setdefault('ensure_ascii', True)
kwargs.setdefault('default', str)
# Сначала попробуем обычное преобразование
try:
result = json.dumps(obj, **kwargs)
# Убедимся, что результат - это строка в правильной кодировке
if isinstance(result, bytes):
result = result.decode('utf-8', errors='replace')
return result
except (TypeError, ValueError) as e:
# Если ошибка - сериализуем с дополнительной обработкой
print(f"[DEBUG] JSON serialization fallback: {e}", file=sys.stderr)
# Пробуем снова с более агрессивными параметрами
def convert_value(val):
if isinstance(val, dict):
return {k: convert_value(v) for k, v in val.items()}
elif isinstance(val, (list, tuple)):
return [convert_value(v) for v in val]
elif isinstance(val, bytes):
try:
return val.decode('utf-8', errors='replace')
except:
return str(val)
else:
try:
# Пробуем кодировать как строку
return str(val)
except:
return repr(val)
converted_obj = convert_value(obj)
return json.dumps(converted_obj, ensure_ascii=True, default=str)
def load_config_from_file(config_file: str) -> dict:
"""
Загружает конфигурацию подключения к БД из файла.
Поддерживает форматы: JSON, TXT (key=value), INI-подобный
Примеры файлов:
JSON (config.json):
{
"host": "localhost",
"port": 3306,
"user": "root",
"password": "mypass",
"database": "mydb"
}
TXT/INI (config.txt):
host=localhost
port=3306
user=root
password=mypass
database=mydb
"""
if not os.path.exists(config_file):
raise FileNotFoundError(f"Файл конфигурации не найден: {config_file}")
with open(config_file, 'r', encoding='utf-8') as f:
content = f.read().strip()
# Пробуем JSON
if content.startswith('{'):
try:
return json.loads(content)
except json.JSONDecodeError as e:
raise ValueError(f"Ошибка парсинга JSON в файле {config_file}: {e}")
# Парсим как key=value (TXT/INI формат)
config = {}
for line in content.split('\n'):
line = line.strip()
# Пропускаем пустые строки и комментарии
if not line or line.startswith('#') or line.startswith(';'):
continue
# Убираем секции [section]
if line.startswith('[') and line.endswith(']'):
continue
# Парсим key=value или key: value
if '=' in line:
key, value = line.split('=', 1)
elif ':' in line:
key, value = line.split(':', 1)
else:
continue
key = key.strip().lower()
value = value.strip().strip('"').strip("'")
# Преобразуем числовые значения
if value.isdigit():
value = int(value)
# Маппинг альтернативных названий ключей
key_mapping = {
'db': 'database',
'db_name': 'database',
'dbname': 'database',
'name': 'database',
'database_name': 'database_name', # для MongoDB
'server': 'host',
'hostname': 'host',
'username': 'user',
'login': 'user',
'pass': 'password',
'pwd': 'password'
}
key = key_mapping.get(key, key)
config[key] = value
if not config:
raise ValueError(f"Не удалось загрузить конфигурацию из файла {config_file}")
return config
class MCPServer:
def __init__(self):
self.db_connector = DBConnector()
self.tools = self._register_tools()
self.resources = self._register_resources()
def _register_tools(self):
"""Регистрирует доступные инструменты (tools)"""
return {
"execute_query": {
"name": "execute_query",
"description": "Execute a natural language SQL-like query against a database. Supports SELECT, INSERT, UPDATE, DELETE, and ALTER operations on SQLite, MySQL, PostgreSQL, and MongoDB databases. IMPORTANT: UPDATE, DELETE, DROP, and ALTER operations require explicit user confirmation - set confirm_changes=true only when the user explicitly asks to modify/delete data. For batch operations, ask permission once and use the same confirmation for all queries in the batch.",
"inputSchema": {
"type": "object",
"properties": {
"db_type": {
"type": "string",
"description": "Type of database: 'sqlite', 'mysql', 'postgresql', or 'mongodb'",
"enum": ["sqlite", "mysql", "postgresql", "mongodb"]
},
"query": {
"type": "string",
"description": "SQL query or natural language query. Examples: 'SELECT * FROM users', 'Покажи все пользователи', 'INSERT INTO users (name) VALUES (\"Ivan\")'"
},
"db_config": {
"type": "object",
"description": "Database connection configuration. For MySQL: {host, port, user, password, database}. For PostgreSQL: {host, port, user, password, dbname}. For SQLite: {db_name}. For MongoDB: {uri, database_name}. Either db_config or config_file is required.",
"properties": {
"host": {"type": "string", "description": "Database host (default: localhost)"},
"port": {"type": "integer", "description": "Database port (MySQL: 3306, PostgreSQL: 5432)"},
"user": {"type": "string", "description": "Database username"},
"password": {"type": "string", "description": "Database password"},
"database": {"type": "string", "description": "Database name for MySQL"},
"dbname": {"type": "string", "description": "Database name for PostgreSQL"},
"db_name": {"type": "string", "description": "Database file path for SQLite"},
"uri": {"type": "string", "description": "MongoDB connection URI"},
"database_name": {"type": "string", "description": "Database name for MongoDB"}
}
},
"config_file": {
"type": "string",
"description": "Path to configuration file (JSON or TXT format). Alternative to db_config. Example: 'config.json' or 'C:/configs/mysql.txt'"
},
"confirm_changes": {
"type": "boolean",
"description": "REQUIRED for UPDATE/DELETE/DROP/ALTER operations. Set to true ONLY when the user EXPLICITLY requests to modify or delete data.",
"default": False
}
},
"required": ["db_type", "query"]
}
},
"list_tables": {
"name": "list_tables",
"description": "Get a list of all tables or collections in a database. Use this to explore database structure.",
"inputSchema": {
"type": "object",
"properties": {
"db_type": {
"type": "string",
"description": "Type of database: 'sqlite', 'mysql', 'postgresql', or 'mongodb'",
"enum": ["sqlite", "mysql", "postgresql", "mongodb"]
},
"db_config": {
"type": "object",
"description": "Database connection configuration. Either db_config or config_file is required.",
"properties": {
"host": {"type": "string"},
"port": {"type": "integer"},
"user": {"type": "string"},
"password": {"type": "string"},
"database": {"type": "string"},
"dbname": {"type": "string"},
"db_name": {"type": "string"},
"uri": {"type": "string"},
"database_name": {"type": "string"}
}
},
"config_file": {
"type": "string",
"description": "Path to configuration file (JSON or TXT format). Alternative to db_config."
}
},
"required": ["db_type"]
}
},
"get_table_schema": {
"name": "get_table_schema",
"description": "Get the schema (columns, types, constraints) of a specific table. Use this to understand table structure before querying.",
"inputSchema": {
"type": "object",
"properties": {
"db_type": {
"type": "string",
"description": "Type of database: 'sqlite', 'mysql', or 'postgresql' (not supported for MongoDB)",
"enum": ["sqlite", "mysql", "postgresql"]
},
"table_name": {
"type": "string",
"description": "Name of the table to get schema for"
},
"db_config": {
"type": "object",
"description": "Database connection configuration. Either db_config or config_file is required.",
"properties": {
"host": {"type": "string"},
"port": {"type": "integer"},
"user": {"type": "string"},
"password": {"type": "string"},
"database": {"type": "string"},
"dbname": {"type": "string"},
"db_name": {"type": "string"}
}
},
"config_file": {
"type": "string",
"description": "Path to configuration file (JSON or TXT format). Alternative to db_config."
}
},
"required": ["db_type", "table_name"]
}
}
}
def _register_resources(self):
"""Регистрирует доступные ресурсы"""
return {
"database_info": {
"uri": "database://info",
"name": "Информация о базах данных",
"description": "Информация о поддерживаемых типах баз данных и их возможностях"
},
"query_examples": {
"uri": "query://examples",
"name": "Примеры запросов",
"description": "Примеры SQL-подобных запросов на естественном языке"
}
}
def _register_prompts(self):
"""Регистрирует подсказки (prompts) для LLM"""
return {
"database_assistant": {
"name": "database_assistant",
"description": "Prompt to help LLM understand when and how to use the SQL MCP Server for database operations",
"arguments": [
{
"name": "task",
"description": "The user's task or question related to database operations"
}
]
}
}
def _safe_disconnect(self):
"""Безопасное отключение от БД без выбрасывания исключений"""
try:
if hasattr(self, 'db_connector') and self.db_connector:
self.db_connector.disconnect()
except Exception:
# Игнорируем все ошибки при отключении
pass
async def handle_initialize(self, params):
"""Обрабатывает запрос инициализации"""
return {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {"listChanged": False},
"resources": {"subscribe": False, "listChanged": False},
"prompts": {"listChanged": False},
"logging": {}
},
"serverInfo": {
"name": "SQL MCP Server",
"version": "1.0.0",
"description": "MCP server for database operations using natural language queries"
}
}
async def handle_list_tools(self):
"""Возвращает список доступных инструментов"""
return {
"tools": [
{
"name": tool_info["name"],
"description": tool_info["description"],
"inputSchema": tool_info["inputSchema"]
}
for tool_info in self.tools.values()
]
}
async def handle_list_resources(self):
"""Возвращает список доступных ресурсов"""
return {
"resources": [
{
"uri": resource_info["uri"],
"name": resource_info["name"],
"description": resource_info["description"],
"mimeType": "application/json"
}
for resource_info in self.resources.values()
]
}
async def handle_read_resource(self, uri):
"""Обрабатывает чтение ресурса"""
if uri == "database://info":
return {
"contents": [{
"uri": uri,
"mimeType": "application/json",
"text": safe_json_dumps({
"supported_databases": {
"sqlite": {
"description": "Легковесная файловая база данных",
"features": ["SELECT", "INSERT", "UPDATE", "DELETE", "ALTER"],
"connection": "По имени файла"
},
"mysql": {
"description": "Популярная реляционная база данных",
"features": ["SELECT", "INSERT", "UPDATE", "DELETE", "ALTER"],
"connection": "host:port/user/password/database"
},
"postgresql": {
"description": "Мощная объектно-реляционная база данных",
"features": ["SELECT", "INSERT", "UPDATE", "DELETE", "ALTER"],
"connection": "host:port/user/password/dbname"
},
"mongodb": {
"description": "Документо-ориентированная база данных",
"features": ["SELECT", "INSERT", "UPDATE", "DELETE"],
"connection": "URI"
}
},
"query_language": "Русский естественный язык",
"supported_operations": [
"SELECT - выборка данных",
"INSERT - добавление данных",
"UPDATE - обновление данных",
"DELETE - удаление данных",
"ALTER - изменение структуры таблицы"
]
}, ensure_ascii=False, indent=2)
}]
}
elif uri == "query://examples":
return {
"contents": [{
"uri": uri,
"mimeType": "application/json",
"text": safe_json_dumps({
"examples": [
{
"operation": "SELECT",
"query": "Покажи имя, должность из таблицы 'Сотрудники' где id:5",
"description": "Выборка определенных колонок с условием"
},
{
"operation": "SELECT",
"query": "Покажи * из таблицы 'Сотрудники'",
"description": "Выборка всех колонок"
},
{
"operation": "INSERT",
"query": "Добавь в таблицу 'Сотрудники' запись: имя:'Иван', должность:'Инженер'",
"description": "Добавление новой записи"
},
{
"operation": "UPDATE",
"query": "Обнови в таблице 'Сотрудники' где id:5, установи должность:'Ведущий инженер'",
"description": "Обновление записи по условию"
},
{
"operation": "DELETE",
"query": "Удали из таблицы 'Сотрудники' где id:5",
"description": "Удаление записи по условию"
},
{
"operation": "ALTER",
"query": "Добавь столбец 'email' тип 'TEXT' в таблицу 'Сотрудники'",
"description": "Добавление нового столбца"
}
]
}, ensure_ascii=False, indent=2)
}]
}
else:
raise ValueError(f"Неизвестный ресурс: {uri}")
async def handle_list_prompts(self):
"""Возвращает список доступных подсказок"""
return {
"prompts": [
{
"name": "database_assistant",
"description": "Help LLM understand when to use SQL MCP Server. Use this prompt when user asks questions about databases, data retrieval, or any database operation.",
"arguments": [
{
"name": "task",
"description": "The user's database-related task or question"
}
]
}
]
}
async def handle_get_prompt(self, name: str, arguments: dict):
"""Возвращает содержимое подсказки (prompt)"""
if name == "database_assistant":
task = arguments.get("task", "database operations")
system_prompt = f"""You are a database assistant. The user has asked about database operations.
User task: {task}
Available tools for database operations:
1. execute_query - Execute SQL-like queries in natural language (Russian)
- Supports: SELECT, INSERT, UPDATE, DELETE, ALTER
- Databases: SQLite, MySQL, PostgreSQL, MongoDB
- IMPORTANT: UPDATE, DELETE, DROP, and ALTER operations are BLOCKED by default!
2. list_tables - Get list of all tables/collections in a database
- Use this to explore database structure
3. get_table_schema - Get table schema (columns, types, constraints)
- Use this to understand table structure before querying
CRITICAL SAFETY RULES:
- UPDATE, DELETE, DROP, and ALTER operations require explicit user confirmation
- NEVER set confirm_changes=true unless the user EXPLICITLY asks to modify/delete data
- If you receive "status": "blocked" response, inform the user and ask for explicit confirmation
- Only after user confirms with words like "да, измени", "да, удали", "подтверждаю", "yes, update/delete" - then retry with confirm_changes=true
- FOR BATCH OPERATIONS: Ask permission ONCE, then use confirm_changes=true for ALL queries in the batch
When to use SQL MCP Server:
- User asks to retrieve, insert, update, or delete data
- User wants to explore database structure
- User needs to know table schemas or column information
- Any database-related operation
Query format in Russian:
- SELECT: "Покажи все пользователи" or "Покажи имя, email из таблицы 'users' где id:5"
- INSERT: "Добавь пользователя с именем Ivan и email ivan@example.com" (allowed without confirmation)
- UPDATE: "Обнови пользователя с id:5, установи email new@example.com" (requires confirm_changes=true!)
- DELETE: "Удали пользователя с id:5" (requires confirm_changes=true!)
- ALTER: "Добавь столбец 'age' тип 'INTEGER' в таблицу 'users'" (requires confirm_changes=true!)
Always use the SQL MCP Server tools for database operations!"""
return {
"messages": [
{
"role": "user",
"content": system_prompt
}
]
}
else:
raise ValueError(f"Неизвестная подсказка: {name}")
async def handle_call_tool(self, name, arguments):
"""Обрабатывает вызов инструмента"""
if name not in self.tools:
raise ValueError(f"Неизвестный инструмент: {name}")
try:
if name == "execute_query":
return await self._execute_query_tool(arguments)
elif name == "list_tables":
return await self._list_tables_tool(arguments)
elif name == "get_table_schema":
return await self._get_table_schema_tool(arguments)
else:
raise ValueError(f"Инструмент {name} не реализован")
except Exception as e:
# Возвращаем результат с ошибкой в формате MCP
return {
"content": [{
"type": "text",
"text": f"Ошибка выполнения инструмента '{name}': {str(e)}"
}],
"isError": True
}
finally:
# Убедимся, что соединение закрывается корректно, но не выбрасываем исключения
self._safe_disconnect()
async def _execute_query_tool(self, arguments):
"""Выполняет SQL-запрос"""
db_type = arguments["db_type"]
query_text = arguments["query"]
db_config = arguments.get("db_config")
config_file = arguments.get("config_file")
# Поддерживаем оба имени параметра для совместимости
confirm_changes = arguments.get("confirm_changes", arguments.get("confirm_destructive", False))
# Загружаем конфиг из файла если указан
if config_file:
try:
config = load_config_from_file(config_file)
except Exception as e:
raise ValueError(f"Ошибка загрузки конфига из файла '{config_file}': {e}")
elif db_config:
config = db_config
else:
raise ValueError("Either db_config or config_file is required. Please provide database connection configuration.")
try:
# Подключаемся к БД
self.db_connector.connect(db_type=db_type, db_config=config)
# Парсим запрос
parsed_query = parse_query(query_text)
if 'error' in parsed_query:
raise ValueError(parsed_query['error'])
# Проверяем операции, требующие подтверждения (изменение или удаление данных)
operation = parsed_query.get('operation', '').upper()
# Операции, изменяющие данные
modifying_operations = ['UPDATE', 'DELETE', 'DROP', 'DROP_TABLE', 'TRUNCATE', 'ALTER']
# Проверяем тип операции
requires_confirmation = operation in modifying_operations
# Также проверяем RAW_SQL на изменяющие команды
if operation == 'RAW_SQL':
raw_query = parsed_query.get('query', '').strip().upper()
modifying_sql_commands = ['UPDATE', 'DELETE', 'DROP', 'TRUNCATE', 'ALTER']
if any(raw_query.startswith(cmd) for cmd in modifying_sql_commands):
requires_confirmation = True
if requires_confirmation and not confirm_changes:
# Определяем тип операции для сообщения
if operation in ['DELETE', 'DROP', 'DROP_TABLE', 'TRUNCATE']:
action_type = "destructive"
action_desc = "delete/drop"
else:
action_type = "modifying"
action_desc = "update/alter"
return {
"content": [{
"type": "text",
"text": safe_json_dumps({
"status": "blocked",
"message": f"Data {action_type} operation blocked. {operation} operations require explicit user confirmation. If the user explicitly requested to {action_desc} data, retry with confirm_changes=true. For batch operations, ask permission ONCE and use the same confirmation for all queries.",
"operation": operation,
"query": query_text,
"parsed": parsed_query,
"hint": f"Ask the user to confirm they want to {action_desc} data before proceeding. For multiple operations, one confirmation is enough."
}, indent=2)
}],
"isError": False
}
# Выполняем запрос
result = self.db_connector.execute(parsed_query)
# Формируем ответ
return {
"content": [{
"type": "text",
"text": safe_json_dumps({
"status": "success",
"data": result,
"query": query_text,
"parsed": parsed_query
}, indent=2)
}]
}
except Exception as e:
# Возвращаем ошибку с деталями в формате MCP
# Убедимся, что сообщение об ошибке правильно кодируется
error_message = str(e)
try:
# Пытаемся кодировать как UTF-8 для гарантии правильной кодировки
if isinstance(error_message, bytes):
error_message = error_message.decode('utf-8', errors='replace')
except:
pass
return {
"content": [{
"type": "text",
"text": safe_json_dumps({
"status": "error",
"error": error_message,
"query": query_text
}, indent=2)
}],
"isError": True
}
finally:
# Безопасно отключаемся от БД
self._safe_disconnect()
async def _list_tables_tool(self, arguments):
"""Получает список таблиц"""
db_type = arguments["db_type"]
db_config = arguments.get("db_config")
config_file = arguments.get("config_file")
# Загружаем конфиг из файла если указан
if config_file:
try:
config = load_config_from_file(config_file)
except Exception as e:
raise ValueError(f"Ошибка загрузки конфига из файла '{config_file}': {e}")
elif db_config:
config = db_config
else:
raise ValueError("Either db_config or config_file is required. Please provide database connection configuration.")
try:
self.db_connector.connect(db_type=db_type, db_config=config)
# Для разных типов БД используем разные запросы
if db_type == "sqlite":
query = "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
elif db_type == "mysql":
query = f"SHOW TABLES FROM {config.get('database')}"
elif db_type == "postgresql":
query = "SELECT tablename FROM pg_tables WHERE schemaname = 'public' ORDER BY tablename"
elif db_type == "mongodb":
# Для MongoDB возвращаем коллекции
db_name = config.get('database_name')
collections = self.db_connector.adapter.db.list_collection_names()
result = [{"name": coll} for coll in collections]
return {
"content": [{
"type": "text",
"text": safe_json_dumps({
"status": "success",
"tables": result,
"database": db_name
}, indent=2)
}]
}
else:
raise ValueError(f"Неподдерживаемый тип БД: {db_type}")
# Выполняем запрос для получения списка таблиц
tables_result = self.db_connector.adapter._execute_query(query)
tables = []
# Безопасное извлечение данных
for row in tables_result:
try:
table_name = None
if isinstance(row, (tuple, list)):
table_name = row[0] if len(row) > 0 else None
elif isinstance(row, dict):
# MySQL SHOW TABLES возвращает колонку типа 'Tables_in_database_name'
# PostgreSQL pg_tables возвращает 'tablename'
# SQLite sqlite_master возвращает 'name'
if 'name' in row:
table_name = row['name']
elif 'tablename' in row:
table_name = row['tablename']
else:
# Для MySQL берём первое значение из словаря
values = list(row.values())
table_name = values[0] if values else None
else:
table_name = str(row)
if table_name:
tables.append({"name": table_name})
except (TypeError, IndexError, AttributeError) as e:
# Пропускаем проблемные строки
pass
return {
"content": [{
"type": "text",
"text": safe_json_dumps({
"status": "success",
"tables": tables
}, indent=2)
}]
}
except Exception as e:
# Возвращаем ошибку с деталями в формате MCP
error_message = str(e)
try:
if isinstance(error_message, bytes):
error_message = error_message.decode('utf-8', errors='replace')
except:
pass
return {
"content": [{
"type": "text",
"text": safe_json_dumps({
"status": "error",
"error": error_message,
"type": type(e).__name__
}, indent=2)
}],
"isError": True
}
finally:
# Безопасно отключаемся от БД
self._safe_disconnect()
async def _get_table_schema_tool(self, arguments):
"""Получает схему таблицы"""
db_type = arguments["db_type"]
table_name = arguments["table_name"]
db_config = arguments.get("db_config")
config_file = arguments.get("config_file")
# Загружаем конфиг из файла если указан
if config_file:
try:
config = load_config_from_file(config_file)
except Exception as e:
raise ValueError(f"Ошибка загрузки конфига из файла '{config_file}': {e}")
elif db_config:
config = db_config
else:
raise ValueError("Either db_config or config_file is required. Please provide database connection configuration.")
try:
self.db_connector.connect(db_type=db_type, db_config=config)
# Для MongoDB схема не применима
if db_type == "mongodb":
raise ValueError("Схема не применима для MongoDB (документо-ориентированная БД)")
# Формируем запрос для получения схемы
if db_type == "sqlite":
query = f"PRAGMA table_info({table_name})"
elif db_type == "mysql":
query = f"DESCRIBE {table_name}"
elif db_type == "postgresql":
query = f"""
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = '{table_name}'
ORDER BY ordinal_position
"""
else:
raise ValueError(f"Неподдерживаемый тип БД: {db_type}")
# Выполняем запрос
schema_result = self.db_connector.adapter._execute_query(query)
# Форматируем результат
schema = []
for row in schema_result:
if db_type == "sqlite":
schema.append({
"name": row["name"],
"type": row["type"],
"nullable": not row["notnull"],
"default": row["dflt_value"],
"primary_key": bool(row["pk"])
})
else:
schema.append({
"name": row.get("Field", row.get("column_name")),
"type": row.get("Type", row.get("data_type")),
"nullable": row.get("Null", row.get("is_nullable")) == "YES",
"default": row.get("Default", row.get("column_default")),
"primary_key": False # Нужно дополнительно проверить для первичных ключей
})
return {
"content": [{
"type": "text",
"text": safe_json_dumps({
"status": "success",
"table": table_name,
"schema": schema
}, indent=2)
}]
}
except Exception as e:
# Возвращаем ошибку с деталями в формате MCP
return {
"content": [{
"type": "text",
"text": safe_json_dumps({
"status": "error",
"error": str(e),
"table": table_name
}, indent=2)
}],
"isError": True
}
finally:
# Безопасно отключаемся от БД
self._safe_disconnect()
async def process_request(self, request):
"""Обрабатывает входящий MCP запрос"""
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
try:
if method == "initialize":
result = await self.handle_initialize(params)
elif method == "tools/list":
result = await self.handle_list_tools()
elif method == "resources/list":
result = await self.handle_list_resources()
elif method == "resources/read":
uri = params.get("uri")
result = await self.handle_read_resource(uri)
elif method == "prompts/list":
result = await self.handle_list_prompts()
elif method == "prompts/get":
prompt_name = params.get("name")
prompt_arguments = params.get("arguments", {})
result = await self.handle_get_prompt(prompt_name, prompt_arguments)
elif method == "tools/call":
tool_name = params.get("name")
tool_arguments = params.get("arguments", {})
result = await self.handle_call_tool(tool_name, tool_arguments)
else:
raise ValueError(f"Неизвестный метод: {method}")
# Если id отсутствует, это уведомление, и ответ отправлять не нужно
if request_id is None:
return None # Не отправляем ответ для уведомлений
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}
return response
except Exception as e:
# Если id отсутствует, это уведомление, и ответ отправлять не нужно
if request_id is None:
return None # Не отправляем ответ для уведомлений
error_response = {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": str(e),
"data": {
"type": type(e).__name__,
"traceback": traceback.format_exc()
}
}
}
return error_response
def main():
"""Основная функция MCP сервера"""
import asyncio
import traceback
# Устанавливаем обработку SIGPIPE для избежания ошибок при записи в закрытый pipe (только на Unix-системах)
try:
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
except AttributeError:
# На Windows нет SIGPIPE, пропускаем установку этого обработчика
pass
server = MCPServer()
try:
if not sys.stderr.closed:
print("Запуск SQL MCP сервера...", file=sys.stderr, flush=True)
except:
pass
request_count = 0
error_count = 0
try:
while True:
try:
# Пытаемся прочитать строку из stdin
try:
line = sys.stdin.readline()
if not line:
# EOF - клиент закрыл соединение
break
except (IOError, OSError, ValueError):
# stdin закрыт или ошибка чтения
break
if not line.strip():
continue
request_count += 1
try:
# Убедимся, что строка корректно декодирована из UTF-8
line_str = line.strip()
# Попытка исправить двойную кодировку (UTF-8 как Latin-1)
try:
# Проверяем, не закодирована ли строка повторно
if '\\u' not in line_str:
# Пробуем декодировать если это bytes маскирующиеся под строку
pass
except:
pass
request = json.loads(line_str)
# Рекурсивно исправляем кодировку в аргументах
def fix_encoding(obj):
if isinstance(obj, str):
try:
# Пробуем исправить "mojibake" - UTF-8 интерпретированный как CP1252/Latin-1
return obj.encode('latin-1').decode('utf-8')
except (UnicodeDecodeError, UnicodeEncodeError):
return obj
elif isinstance(obj, dict):
return {k: fix_encoding(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [fix_encoding(item) for item in obj]
return obj
# Исправляем кодировку в params если есть признаки mojibake
if isinstance(request, dict) and 'params' in request:
params = request['params']
# Проверяем есть ли признаки mojibake (типичные паттерны)
params_str = str(params)
if any(char in params_str for char in ['Ð', 'Ñ', 'Ð', '¡', '¤', '°']):
request['params'] = fix_encoding(params)
except json.JSONDecodeError as e:
# Ошибка парсинга JSON - продолжаем работу
try:
if not sys.stderr.closed:
print(f"[DEBUG] JSON decode error: {e}", file=sys.stderr, flush=True)
except:
pass
continue
response = None
try:
# Обрабатываем запрос с полной защитой
try:
response = asyncio.run(server.process_request(request))
except Exception as e:
# Ошибка обработки запроса - формируем ошибку
request_id = request.get("id") if isinstance(request, dict) else None
if request_id is not None:
response = {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": "Internal error",
"data": str(e)
}
}
error_count += 1
try:
if not sys.stderr.closed:
print(f"[DEBUG] Request processing error: {e}", file=sys.stderr, flush=True)
except:
pass
# Отправляем ответ если он есть
if response is not None:
try:
output = safe_json_dumps(response)
print(output, flush=True)
except (BrokenPipeError, IOError, OSError, ValueError):
# Pipe разорван - выходим корректно
break
except Exception as e:
# Неожиданная ошибка при отправке ответа
try:
if not sys.stderr.closed:
print(f"[DEBUG] Response send error: {e}", file=sys.stderr, flush=True)
except:
pass
break
except Exception as e:
# Неожиданная ошибка в обработке запроса
try:
if not sys.stderr.closed:
print(f"[DEBUG] Unexpected error: {e}\n{traceback.format_exc()}", file=sys.stderr, flush=True)
except:
pass
except KeyboardInterrupt:
break
except BrokenPipeError:
break
except (IOError, OSError):
break
except Exception as e:
try:
if not sys.stderr.closed:
print(f"[DEBUG] Main loop error: {e}", file=sys.stderr, flush=True)
except:
pass
break
except KeyboardInterrupt:
pass
except Exception:
pass
finally:
# Закрываем соединение с БД
try:
server._safe_disconnect()
except:
pass
try:
if not sys.stderr.closed:
print(f"SQL MCP сервер завершил работу (обработано {request_count} запросов, ошибок {error_count}).", file=sys.stderr, flush=True)
except:
pass
if __name__ == "__main__":
main()