Skip to main content
Glama

n8n MCP Server

n8n_client.py4.7 kB
""" Клиент для работы с n8n API. Client for n8n API interactions. """ import json import logging from typing import Any, Dict, List, Optional import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from config import config logger = logging.getLogger(__name__) class N8NClient: """Клиент для взаимодействия с n8n API.""" def __init__(self): """Инициализация клиента n8n.""" self.base_url = config.n8n_base_url.rstrip("/") self.headers = config.get_n8n_headers() self.session = self._create_session() def _create_session(self) -> requests.Session: """Создание сессии с настройками retry.""" session = requests.Session() retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter) return session def _make_request( self, method: str, endpoint: str, params: Optional[Dict[str, Any]] = None, json_data: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Выполнение HTTP-запроса к n8n API.""" url = f"{self.base_url}{endpoint}" logger.debug(f"Making {method} request to {url}") try: response = self.session.request( method=method, url=url, headers=self.headers, params=params, json=json_data, timeout=30 ) response.raise_for_status() # Для некоторых эндпоинтов может не быть JSON if response.content: return response.json() return {"success": True} except requests.exceptions.RequestException as e: logger.error(f"Request failed: {e}") raise Exception(f"n8n API request failed: {str(e)}") def list_workflows(self) -> List[Dict[str, Any]]: """Получение списка всех workflow.""" return self._make_request("GET", "/api/v1/workflows") def get_workflow(self, workflow_id: str) -> Dict[str, Any]: """Получение workflow по ID.""" return self._make_request("GET", f"/api/v1/workflows/{workflow_id}") def create_workflow(self, workflow_data: Dict[str, Any]) -> Dict[str, Any]: """Создание нового workflow.""" return self._make_request("POST", "/api/v1/workflows", json_data=workflow_data) def update_workflow(self, workflow_id: str, workflow_data: Dict[str, Any], method: str = "PATCH") -> Dict[str, Any]: """Обновление существующего workflow.""" # Получаем текущий workflow для получения всех полей current_workflow = self.get_workflow(workflow_id) # Создаём полные данные для обновления update_data = { "name": workflow_data.get("name", current_workflow.get("name")), "nodes": workflow_data.get("nodes", current_workflow.get("nodes", [])), "connections": workflow_data.get("connections", current_workflow.get("connections", {})), "settings": workflow_data.get("settings", current_workflow.get("settings", {})), "staticData": workflow_data.get("staticData", current_workflow.get("staticData")) } # Добавляем active если указано if "active" in workflow_data: update_data["active"] = workflow_data["active"] return self._make_request(method, f"/api/v1/workflows/{workflow_id}", json_data=update_data) def delete_workflow(self, workflow_id: str) -> Dict[str, Any]: """Удаление workflow.""" return self._make_request("DELETE", f"/api/v1/workflows/{workflow_id}") def execute_workflow(self, workflow_id: str, input_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Запуск workflow.""" data = {} if input_data: data["inputs"] = input_data return self._make_request("POST", f"/api/v1/workflows/{workflow_id}/run", json_data=data) def get_execution_status(self, execution_id: str) -> Dict[str, Any]: """Получение статуса выполнения workflow.""" return self._make_request("GET", f"/api/v1/executions/{execution_id}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Ospray-creator/n8n-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server