n8n_client.py•4.7 kB
"""
Клиент для работы с n8n API.
Client for n8n API interactions.
"""
import json
import logging
from typing import Any, Dict, List, Optional
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from config import config
logger = logging.getLogger(__name__)
class N8NClient:
"""Клиент для взаимодействия с n8n API."""
def __init__(self):
"""Инициализация клиента n8n."""
self.base_url = config.n8n_base_url.rstrip("/")
self.headers = config.get_n8n_headers()
self.session = self._create_session()
def _create_session(self) -> requests.Session:
"""Создание сессии с настройками retry."""
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def _make_request(
self,
method: str,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Выполнение HTTP-запроса к n8n API."""
url = f"{self.base_url}{endpoint}"
logger.debug(f"Making {method} request to {url}")
try:
response = self.session.request(
method=method,
url=url,
headers=self.headers,
params=params,
json=json_data,
timeout=30
)
response.raise_for_status()
# Для некоторых эндпоинтов может не быть JSON
if response.content:
return response.json()
return {"success": True}
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
raise Exception(f"n8n API request failed: {str(e)}")
def list_workflows(self) -> List[Dict[str, Any]]:
"""Получение списка всех workflow."""
return self._make_request("GET", "/api/v1/workflows")
def get_workflow(self, workflow_id: str) -> Dict[str, Any]:
"""Получение workflow по ID."""
return self._make_request("GET", f"/api/v1/workflows/{workflow_id}")
def create_workflow(self, workflow_data: Dict[str, Any]) -> Dict[str, Any]:
"""Создание нового workflow."""
return self._make_request("POST", "/api/v1/workflows", json_data=workflow_data)
def update_workflow(self, workflow_id: str, workflow_data: Dict[str, Any], method: str = "PATCH") -> Dict[str, Any]:
"""Обновление существующего workflow."""
# Получаем текущий workflow для получения всех полей
current_workflow = self.get_workflow(workflow_id)
# Создаём полные данные для обновления
update_data = {
"name": workflow_data.get("name", current_workflow.get("name")),
"nodes": workflow_data.get("nodes", current_workflow.get("nodes", [])),
"connections": workflow_data.get("connections", current_workflow.get("connections", {})),
"settings": workflow_data.get("settings", current_workflow.get("settings", {})),
"staticData": workflow_data.get("staticData", current_workflow.get("staticData"))
}
# Добавляем active если указано
if "active" in workflow_data:
update_data["active"] = workflow_data["active"]
return self._make_request(method, f"/api/v1/workflows/{workflow_id}", json_data=update_data)
def delete_workflow(self, workflow_id: str) -> Dict[str, Any]:
"""Удаление workflow."""
return self._make_request("DELETE", f"/api/v1/workflows/{workflow_id}")
def execute_workflow(self, workflow_id: str, input_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Запуск workflow."""
data = {}
if input_data:
data["inputs"] = input_data
return self._make_request("POST", f"/api/v1/workflows/{workflow_id}/run", json_data=data)
def get_execution_status(self, execution_id: str) -> Dict[str, Any]:
"""Получение статуса выполнения workflow."""
return self._make_request("GET", f"/api/v1/executions/{execution_id}")