server.py•11.6 kB
"""
Основной MCP-сервер для управления n8n workflow.
Main MCP server for n8n workflow management.
"""
import logging
from typing import Any, Dict, List, Optional
from mcp.server.fastmcp import FastMCP
from config import config
from n8n_client import N8NClient
# Настройка логирования
logging.basicConfig(
level=getattr(logging, config.log_level),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Создаём экземпляр FastMCP
server = FastMCP(name=config.server_name)
n8n_client = N8NClient()
# Импортируем функции для инструментов (только те, которых нет в server.py)
# from tools import (
# list_node_categories,
# get_nodes_by_category,
# get_node_info
# )
# Регистрируем инструменты
# server.add_tool(list_node_categories)
# server.add_tool(get_nodes_by_category)
# server.add_tool(get_node_info)
@server.tool()
def list_workflows() -> List[Dict[str, Any]]:
"""
Возвращает список всех workflow с базовой информацией.
Возвращает:
List[Dict[str, Any]]: Список workflow с полями id, name, active и т.д.
"""
try:
workflows = n8n_client.list_workflows()
if isinstance(workflows, dict) and "data" in workflows:
return workflows["data"]
return workflows
except Exception as e:
raise Exception(f"Ошибка получения списка workflow: {str(e)}")
@server.tool()
def get_workflow(workflow_id: str) -> Dict[str, Any]:
"""
Получает полное описание workflow по ID.
Параметры:
workflow_id (str): ID workflow в n8n
Возвращает:
Dict[str, Any]: Полный JSON workflow
"""
try:
return n8n_client.get_workflow(workflow_id)
except Exception as e:
raise Exception(f"Ошибка получения workflow {workflow_id}: {str(e)}")
@server.tool()
def create_workflow(
name: str,
nodes: List[Dict[str, Any]],
connections: Optional[Dict[str, Any]] = None,
settings: Optional[Dict[str, Any]] = None,
staticData: Optional[Dict[str, Any]] = None,
active: bool = False,
) -> Dict[str, Any]:
"""
Создаёт новый workflow в n8n.
Параметры:
name (str): Название workflow
nodes (List[Dict[str, Any]]): Массив узлов в формате n8n JSON (обязательный)
connections (Dict[str, Any], optional): Связи между узлами
settings (Dict[str, Any], optional): Настройки workflow
staticData (Dict[str, Any], optional): Статические данные
active (bool): Активен ли workflow после создания (по умолчанию False)
Возвращает:
Dict[str, Any]: Созданный workflow с ID
"""
workflow_data = {
"name": name,
"nodes": nodes if nodes is not None else [],
"connections": connections if connections is not None else {},
"settings": settings if settings is not None else {}
}
if staticData is not None:
workflow_data["staticData"] = staticData
try:
return n8n_client.create_workflow(workflow_data)
except Exception as e:
raise Exception(f"Ошибка создания workflow: {str(e)}")
@server.tool()
def update_workflow(
self,
workflow_id: str,
name: Optional[str] = None,
nodes: Optional[List[Dict[str, Any]]] = None,
connections: Optional[Dict[str, Any]] = None,
active: Optional[bool] = None
) -> Dict[str, Any]:
"""
Обновляет существующий workflow.
Параметры:
workflow_id (str): ID workflow для обновления
name (str, optional): Новое название
nodes (List[Dict[str, Any]], optional): Новые узлы
connections (Dict[str, Any]], optional): Новые связи
active (bool, optional): Новый статус активности
Возвращает:
Dict[str, Any]: Обновлённый workflow
"""
# Получаем текущий workflow для получения всех полей
try:
current_workflow = self.n8n_client.get_workflow(workflow_id)
except Exception as e:
raise Exception(f"Ошибка получения workflow {workflow_id}: {str(e)}")
# Создаём данные для обновления на основе текущего workflow
update_data = {
"name": name if name is not None else current_workflow.get("name"),
"nodes": nodes if nodes is not None else current_workflow.get("nodes", []),
"connections": connections if connections is not None else current_workflow.get("connections", {}),
"settings": current_workflow.get("settings", {}),
"staticData": current_workflow.get("staticData")
}
if active is not None:
update_data["active"] = active
try:
return self.n8n_client.update_workflow(workflow_id, update_data, method='PUT')
except Exception as e:
raise Exception(f"Ошибка обновления workflow {workflow_id}: {str(e)}")
@server.tool()
def delete_workflow(workflow_id: str) -> Dict[str, Any]:
"""
Удаляет workflow по ID.
Параметры:
workflow_id (str): ID workflow для удаления
Возвращает:
Dict[str, Any]: Статус удаления
"""
try:
return n8n_client.delete_workflow(workflow_id)
except Exception as e:
raise Exception(f"Ошибка удаления workflow {workflow_id}: {str(e)}")
@server.tool()
def execute_workflow(
workflow_id: str,
input_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Запускает workflow вручную.
Параметры:
workflow_id (str): ID workflow для запуска
input_data (Dict[str, Any], optional): Данные для передачи в workflow
Возвращает:
Dict[str, Any]: ID выполнения и статус
"""
try:
return n8n_client.execute_workflow(workflow_id, input_data)
except Exception as e:
raise Exception(f"Ошибка запуска workflow {workflow_id}: {str(e)}")
@server.tool()
def get_execution_status(execution_id: str) -> Dict[str, Any]:
"""
Получает статус выполнения workflow.
Параметры:
execution_id (str): ID выполнения
Возвращает:
Dict[str, Any]: Статус выполнения
"""
try:
return n8n_client.get_execution_status(execution_id)
except Exception as e:
raise Exception(f"Ошибка получения статуса выполнения {execution_id}: {str(e)}")
@server.tool()
def list_node_categories() -> List[Dict[str, Any]]:
"""
Возвращает список категорий доступных узлов n8n.
Returns:
Список категорий с информацией о количестве узлов в каждой категории.
"""
try:
from nodes_module import get_node_categories
return get_node_categories()
except Exception as e:
raise ValueError(f"Ошибка получения категорий узлов: {e}")
@server.tool()
def get_nodes_by_category(category_id: str) -> List[Dict[str, Any]]:
"""
Возвращает список узлов для указанной категории.
Args:
category_id: ID категории (core, trigger, action, transform, aggregate)
Returns:
Список узлов с полной информацией о параметрах и настройках.
"""
try:
from nodes_module import get_nodes_by_category
nodes = list(get_nodes_by_category(category_id))
if not nodes:
raise ValueError(f"Категория '{category_id}' не найдена или не содержит узлов")
return nodes
except Exception as e:
raise ValueError(f"Ошибка получения узлов категории '{category_id}': {e}")
@server.tool()
def get_node_info(node_type: str) -> Dict[str, Any]:
"""
Возвращает подробную информацию об узле по его типу.
Args:
node_type: Тип узла (например, "n8n-nodes-base.set")
Returns:
Информация об узле включая параметры и категорию.
"""
try:
from nodes_module import get_node_by_type
node_info = get_node_by_type(node_type)
if not node_info:
raise ValueError(f"Узел с типом '{node_type}' не найден")
return node_info
except Exception as e:
raise ValueError(f"Ошибка получения информации об узле '{node_type}': {e}")
@server.tool()
def search_nodes(query: str) -> Dict[str, Any]:
"""
Поиск узлов по названию или описанию.
Args:
query: Поисковый запрос
Returns:
Dict[str, Any]: Объект с ключом 'result' содержащий список узлов
"""
try:
from nodes_module import get_all_nodes
all_nodes = get_all_nodes()
query_lower = query.lower()
matching_nodes = []
for node in all_nodes:
if (query_lower in node["name"].lower() or
query_lower in node.get("description", "").lower() or
query_lower in node["type"].lower()):
matching_nodes.append(node)
# Возвращаем объект с ключом 'result'
return {"result": matching_nodes}
except Exception as e:
# В случае ошибки возвращаем пустой результат
print(f"Ошибка поиска узлов по запросу '{query}': {e}")
return {"result": []}
class N8NMcpServer:
"""MCP-сервер для управления n8n workflow."""
def __init__(self):
"""Инициализация MCP-сервера."""
logger.info("MCP-сервер инициализирован")
def run(self, transport: str = "stdio", host: str = "localhost", port: int = 9876):
"""Запуск MCP-сервера."""
logger.info(f"Запуск MCP-сервера: {config.server_name}")
if transport == "stdio":
server.run()
else:
raise ValueError(f"Транспорт {transport} не поддерживается. Используйте stdio.")
# Глобальный экземпляр сервера
n8n_mcp_server = N8NMcpServer()
def main():
"""Точка входа для запуска сервера."""
import sys
transport = sys.argv[1] if len(sys.argv) > 1 else "stdio"
if transport == "stdio":
n8n_mcp_server.run("stdio")
else:
print("Использование: python server.py [stdio]")
print(" stdio - стандартный ввод/вывод (рекомендуется)")
sys.exit(1)
if __name__ == "__main__":
main()