Skip to main content
Glama
n8n_integration.py8.78 kB
""" n8n Integration Module Handles webhook requests from n8n and integrates with MCP tools """ import logging from typing import Dict, Any, Optional from datetime import datetime import aiohttp from fastapi import HTTPException from models import ( N8NWebhookRequest, N8NWebhookResponse, N8NConfig, ABCSystemConfig ) from tools.system_check import SystemCheckTool logger = logging.getLogger(__name__) class N8NIntegration: """ Handles integration with n8n workflows """ def __init__(self, n8n_config: N8NConfig, abc_config: ABCSystemConfig): self.config = n8n_config self.n8n_url = n8n_config.instance_url self.api_key = n8n_config.api_key # Initialize tools self.tools = { "check_system_abc": SystemCheckTool(abc_config) } logger.info(f"n8n Integration initialized for {self.n8n_url}") async def handle_webhook_request( self, webhook_id: str, request_data: Dict[str, Any] ) -> N8NWebhookResponse: """ Handle incoming webhook request from n8n Args: webhook_id: Webhook identifier request_data: Request payload from n8n Returns: N8NWebhookResponse with execution result """ start_time = datetime.utcnow() try: # Parse request webhook_request = N8NWebhookRequest( tool=request_data.get("tool", "check_system_abc"), params=request_data.get("params", {}), webhook_id=webhook_id, execution_id=request_data.get("executionId") ) logger.info( f"Processing n8n webhook: {webhook_id} " f"(tool: {webhook_request.tool}, execution: {webhook_request.execution_id})" ) # Execute tool result = await self._execute_tool( webhook_request.tool, webhook_request.params ) execution_time = (datetime.utcnow() - start_time).total_seconds() * 1000 # Build response response = N8NWebhookResponse( success=True, data=result, tool=webhook_request.tool, timestamp=datetime.utcnow(), execution_time_ms=execution_time ) logger.info( f"Webhook {webhook_id} completed successfully in {execution_time:.2f}ms" ) return response except Exception as e: execution_time = (datetime.utcnow() - start_time).total_seconds() * 1000 logger.error( f"Webhook {webhook_id} failed after {execution_time:.2f}ms: {e}", exc_info=True ) return N8NWebhookResponse( success=False, data=None, tool=request_data.get("tool", "unknown"), timestamp=datetime.utcnow(), execution_time_ms=execution_time, error=str(e) ) async def _execute_tool(self, tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]: """ Execute a tool by name Args: tool_name: Name of the tool to execute params: Tool parameters Returns: Tool execution result Raises: HTTPException: If tool not found """ if tool_name not in self.tools: raise HTTPException( status_code=400, detail=f"Tool not found: {tool_name}" ) tool = self.tools[tool_name] result = await tool.execute(params) return result async def trigger_workflow( self, webhook_path: str, data: Dict[str, Any] ) -> Dict[str, Any]: """ Trigger an n8n workflow via webhook Args: webhook_path: Webhook path (e.g., 'mcp-system-check') data: Data to send to workflow Returns: n8n workflow response Raises: Exception: If workflow trigger fails """ url = f"https://{self.n8n_url}/webhook/{webhook_path}" logger.info(f"Triggering n8n workflow: {url}") try: headers = { "Content-Type": "application/json" } if self.api_key: headers["Authorization"] = f"Bearer {self.api_key}" async with aiohttp.ClientSession() as session: async with session.post(url, json=data, headers=headers) as response: if response.status >= 400: error_text = await response.text() raise Exception( f"n8n workflow trigger failed: {response.status} - {error_text}" ) result = await response.json() logger.info(f"n8n workflow triggered successfully: {webhook_path}") return result except aiohttp.ClientError as e: logger.error(f"Failed to trigger n8n workflow: {e}") raise Exception(f"n8n workflow trigger error: {str(e)}") async def create_webhook(self, workflow_id: str) -> str: """ Create a webhook in n8n (requires n8n API access) Args: workflow_id: n8n workflow ID Returns: Webhook URL Note: This requires n8n API access and is currently a placeholder """ if not self.api_key: raise Exception("n8n API key required for webhook creation") # This would use the n8n API to create a webhook # For now, return a placeholder webhook_path = f"webhook-{workflow_id}" logger.warning("create_webhook is a placeholder implementation") return f"https://{self.n8n_url}/webhook/{webhook_path}" async def get_workflow_executions( self, workflow_id: str, limit: int = 10 ) -> List[Dict[str, Any]]: """ Get recent executions for a workflow (requires n8n API access) Args: workflow_id: n8n workflow ID limit: Maximum number of executions to return Returns: List of workflow executions Note: This requires n8n API access and is currently a placeholder """ if not self.api_key: raise Exception("n8n API key required for execution history") # This would use the n8n API to get executions # For now, return empty list logger.warning("get_workflow_executions is a placeholder implementation") return [] def get_available_tools(self) -> List[Dict[str, Any]]: """ Get list of available tools for n8n integration Returns: List of tool definitions """ return [ { "name": name, "description": tool.description, "input_schema": tool.input_schema.model_dump() } for name, tool in self.tools.items() ] async def validate_webhook(self, webhook_id: str) -> bool: """ Validate if a webhook ID is valid Args: webhook_id: Webhook ID to validate Returns: True if valid, False otherwise """ # Simple validation - check if webhook_id matches expected pattern # In production, this should check against a database or n8n API if not webhook_id or len(webhook_id) < 3: return False return True async def send_notification( self, webhook_path: str, notification_type: str, message: str, data: Optional[Dict[str, Any]] = None ) -> bool: """ Send notification to n8n workflow Args: webhook_path: Webhook path for notifications notification_type: Type of notification (e.g., 'alert', 'info', 'error') message: Notification message data: Additional data Returns: True if sent successfully, False otherwise """ try: payload = { "type": notification_type, "message": message, "timestamp": datetime.utcnow().isoformat(), "data": data or {} } await self.trigger_workflow(webhook_path, payload) logger.info(f"Notification sent: {notification_type} - {message}") return True except Exception as e: logger.error(f"Failed to send notification: {e}") return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nguyenxtan/mcpwn8n'

If you have feedback or need assistance with the MCP directory API, please join our Discord server