Skip to main content
Glama

n8n MCP Server

server.py11.6 kB
""" Основной MCP-сервер для управления n8n workflow. Main MCP server for n8n workflow management. """ import logging from typing import Any, Dict, List, Optional from mcp.server.fastmcp import FastMCP from config import config from n8n_client import N8NClient # Настройка логирования logging.basicConfig( level=getattr(logging, config.log_level), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Создаём экземпляр FastMCP server = FastMCP(name=config.server_name) n8n_client = N8NClient() # Импортируем функции для инструментов (только те, которых нет в server.py) # from tools import ( # list_node_categories, # get_nodes_by_category, # get_node_info # ) # Регистрируем инструменты # server.add_tool(list_node_categories) # server.add_tool(get_nodes_by_category) # server.add_tool(get_node_info) @server.tool() def list_workflows() -> List[Dict[str, Any]]: """ Возвращает список всех workflow с базовой информацией. Возвращает: List[Dict[str, Any]]: Список workflow с полями id, name, active и т.д. """ try: workflows = n8n_client.list_workflows() if isinstance(workflows, dict) and "data" in workflows: return workflows["data"] return workflows except Exception as e: raise Exception(f"Ошибка получения списка workflow: {str(e)}") @server.tool() def get_workflow(workflow_id: str) -> Dict[str, Any]: """ Получает полное описание workflow по ID. Параметры: workflow_id (str): ID workflow в n8n Возвращает: Dict[str, Any]: Полный JSON workflow """ try: return n8n_client.get_workflow(workflow_id) except Exception as e: raise Exception(f"Ошибка получения workflow {workflow_id}: {str(e)}") @server.tool() def create_workflow( name: str, nodes: List[Dict[str, Any]], connections: Optional[Dict[str, Any]] = None, settings: Optional[Dict[str, Any]] = None, staticData: Optional[Dict[str, Any]] = None, active: bool = False, ) -> Dict[str, Any]: """ Создаёт новый workflow в n8n. Параметры: name (str): Название workflow nodes (List[Dict[str, Any]]): Массив узлов в формате n8n JSON (обязательный) connections (Dict[str, Any], optional): Связи между узлами settings (Dict[str, Any], optional): Настройки workflow staticData (Dict[str, Any], optional): Статические данные active (bool): Активен ли workflow после создания (по умолчанию False) Возвращает: Dict[str, Any]: Созданный workflow с ID """ workflow_data = { "name": name, "nodes": nodes if nodes is not None else [], "connections": connections if connections is not None else {}, "settings": settings if settings is not None else {} } if staticData is not None: workflow_data["staticData"] = staticData try: return n8n_client.create_workflow(workflow_data) except Exception as e: raise Exception(f"Ошибка создания workflow: {str(e)}") @server.tool() def update_workflow( self, workflow_id: str, name: Optional[str] = None, nodes: Optional[List[Dict[str, Any]]] = None, connections: Optional[Dict[str, Any]] = None, active: Optional[bool] = None ) -> Dict[str, Any]: """ Обновляет существующий workflow. Параметры: workflow_id (str): ID workflow для обновления name (str, optional): Новое название nodes (List[Dict[str, Any]], optional): Новые узлы connections (Dict[str, Any]], optional): Новые связи active (bool, optional): Новый статус активности Возвращает: Dict[str, Any]: Обновлённый workflow """ # Получаем текущий workflow для получения всех полей try: current_workflow = self.n8n_client.get_workflow(workflow_id) except Exception as e: raise Exception(f"Ошибка получения workflow {workflow_id}: {str(e)}") # Создаём данные для обновления на основе текущего workflow update_data = { "name": name if name is not None else current_workflow.get("name"), "nodes": nodes if nodes is not None else current_workflow.get("nodes", []), "connections": connections if connections is not None else current_workflow.get("connections", {}), "settings": current_workflow.get("settings", {}), "staticData": current_workflow.get("staticData") } if active is not None: update_data["active"] = active try: return self.n8n_client.update_workflow(workflow_id, update_data, method='PUT') except Exception as e: raise Exception(f"Ошибка обновления workflow {workflow_id}: {str(e)}") @server.tool() def delete_workflow(workflow_id: str) -> Dict[str, Any]: """ Удаляет workflow по ID. Параметры: workflow_id (str): ID workflow для удаления Возвращает: Dict[str, Any]: Статус удаления """ try: return n8n_client.delete_workflow(workflow_id) except Exception as e: raise Exception(f"Ошибка удаления workflow {workflow_id}: {str(e)}") @server.tool() def execute_workflow( workflow_id: str, input_data: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Запускает workflow вручную. Параметры: workflow_id (str): ID workflow для запуска input_data (Dict[str, Any], optional): Данные для передачи в workflow Возвращает: Dict[str, Any]: ID выполнения и статус """ try: return n8n_client.execute_workflow(workflow_id, input_data) except Exception as e: raise Exception(f"Ошибка запуска workflow {workflow_id}: {str(e)}") @server.tool() def get_execution_status(execution_id: str) -> Dict[str, Any]: """ Получает статус выполнения workflow. Параметры: execution_id (str): ID выполнения Возвращает: Dict[str, Any]: Статус выполнения """ try: return n8n_client.get_execution_status(execution_id) except Exception as e: raise Exception(f"Ошибка получения статуса выполнения {execution_id}: {str(e)}") @server.tool() def list_node_categories() -> List[Dict[str, Any]]: """ Возвращает список категорий доступных узлов n8n. Returns: Список категорий с информацией о количестве узлов в каждой категории. """ try: from nodes_module import get_node_categories return get_node_categories() except Exception as e: raise ValueError(f"Ошибка получения категорий узлов: {e}") @server.tool() def get_nodes_by_category(category_id: str) -> List[Dict[str, Any]]: """ Возвращает список узлов для указанной категории. Args: category_id: ID категории (core, trigger, action, transform, aggregate) Returns: Список узлов с полной информацией о параметрах и настройках. """ try: from nodes_module import get_nodes_by_category nodes = list(get_nodes_by_category(category_id)) if not nodes: raise ValueError(f"Категория '{category_id}' не найдена или не содержит узлов") return nodes except Exception as e: raise ValueError(f"Ошибка получения узлов категории '{category_id}': {e}") @server.tool() def get_node_info(node_type: str) -> Dict[str, Any]: """ Возвращает подробную информацию об узле по его типу. Args: node_type: Тип узла (например, "n8n-nodes-base.set") Returns: Информация об узле включая параметры и категорию. """ try: from nodes_module import get_node_by_type node_info = get_node_by_type(node_type) if not node_info: raise ValueError(f"Узел с типом '{node_type}' не найден") return node_info except Exception as e: raise ValueError(f"Ошибка получения информации об узле '{node_type}': {e}") @server.tool() def search_nodes(query: str) -> Dict[str, Any]: """ Поиск узлов по названию или описанию. Args: query: Поисковый запрос Returns: Dict[str, Any]: Объект с ключом 'result' содержащий список узлов """ try: from nodes_module import get_all_nodes all_nodes = get_all_nodes() query_lower = query.lower() matching_nodes = [] for node in all_nodes: if (query_lower in node["name"].lower() or query_lower in node.get("description", "").lower() or query_lower in node["type"].lower()): matching_nodes.append(node) # Возвращаем объект с ключом 'result' return {"result": matching_nodes} except Exception as e: # В случае ошибки возвращаем пустой результат print(f"Ошибка поиска узлов по запросу '{query}': {e}") return {"result": []} class N8NMcpServer: """MCP-сервер для управления n8n workflow.""" def __init__(self): """Инициализация MCP-сервера.""" logger.info("MCP-сервер инициализирован") def run(self, transport: str = "stdio", host: str = "localhost", port: int = 9876): """Запуск MCP-сервера.""" logger.info(f"Запуск MCP-сервера: {config.server_name}") if transport == "stdio": server.run() else: raise ValueError(f"Транспорт {transport} не поддерживается. Используйте stdio.") # Глобальный экземпляр сервера n8n_mcp_server = N8NMcpServer() def main(): """Точка входа для запуска сервера.""" import sys transport = sys.argv[1] if len(sys.argv) > 1 else "stdio" if transport == "stdio": n8n_mcp_server.run("stdio") else: print("Использование: python server.py [stdio]") print(" stdio - стандартный ввод/вывод (рекомендуется)") sys.exit(1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Ospray-creator/n8n-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server