Skip to main content
Glama

n8n MCP Server

by CodeHalwell
app.py3.77 kB
from __future__ import annotations from contextlib import asynccontextmanager from typing import Any, AsyncIterator, Dict, Optional, Union from fastapi import Depends, FastAPI, HTTPException from pydantic import BaseModel from client.n8n_client import N8nClient from core.builder import WorkflowBuilder from core.config import Settings from core.specs import WorkflowSpec from core.validator import validate_workflow from mcp_server.utils import get_workflow_by_identifier, workflow_id_or_raise app = FastAPI(title="n8n-mcp") _settings = Settings.load_from_env() _builder = WorkflowBuilder() class CreateWorkflowRequest(BaseModel): spec: WorkflowSpec dry_run: bool = True commit: bool = False activate: bool = False class UpdateWorkflowRequest(BaseModel): identifier: str patch: Dict[str, Any] class ExecuteWorkflowRequest(BaseModel): identifier: str payload: Optional[Dict[str, Any]] = None def _client() -> N8nClient: return N8nClient(_settings) @asynccontextmanager async def n8n_client_manager() -> AsyncIterator[N8nClient]: """FastAPI dependency for managing N8nClient lifecycle.""" client = _client() try: yield client finally: await client.close() def _workflow_id(workflow: Dict[str, Any]) -> Union[str, int]: """Legacy wrapper for backward compatibility.""" return workflow_id_or_raise( workflow, lambda msg: HTTPException(status_code=502, detail=msg) ) @app.get("/health") async def health(client: N8nClient = Depends(n8n_client_manager)) -> Dict[str, Any]: info = await client.health() return {"status": "ok", "info": info} @app.post("/tools/create_workflow") async def create_workflow( req: CreateWorkflowRequest, client: N8nClient = Depends(n8n_client_manager) ) -> Dict[str, Any]: spec = req.spec errors = validate_workflow(spec) if errors: raise HTTPException(status_code=400, detail=errors) workflow_json = _builder.build(spec) if req.dry_run and not req.commit: return {"validated": True, "workflow": workflow_json} existing = await client.list_workflows() existing_names = [wf.get("name") for wf in existing] if spec.name in existing_names: raise HTTPException(status_code=409, detail="workflow name already exists") created = await client.create_workflow(workflow_json) if req.activate: await client.set_activation(_workflow_id(created), True) return {"validated": True, "workflow": created} @app.post("/tools/update_workflow") async def update_workflow( req: UpdateWorkflowRequest, client: N8nClient = Depends(n8n_client_manager) ) -> Dict[str, Any]: workflow = await get_workflow_by_identifier(client, req.identifier) updated = await client.update_workflow(_workflow_id(workflow), req.patch) return {"updated": updated} @app.get("/tools/list_workflows") async def list_workflows(client: N8nClient = Depends(n8n_client_manager)) -> Dict[str, Any]: workflows = await client.list_workflows() return {"workflows": workflows} @app.get("/tools/get_workflow/{identifier}") async def get_workflow( identifier: str, client: N8nClient = Depends(n8n_client_manager) ) -> Dict[str, Any]: workflow = await get_workflow_by_identifier(client, identifier) full = await client.get_workflow(_workflow_id(workflow)) return {"workflow": full} @app.post("/tools/execute_workflow") async def execute_workflow( req: ExecuteWorkflowRequest, client: N8nClient = Depends(n8n_client_manager) ) -> Dict[str, Any]: workflow = await get_workflow_by_identifier(client, req.identifier) result = await client.execute_workflow(_workflow_id(workflow), req.payload or {}) return {"result": result}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/CodeHalwell/n8n-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server