app.py•3.77 kB
from __future__ import annotations
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator, Dict, Optional, Union
from fastapi import Depends, FastAPI, HTTPException
from pydantic import BaseModel
from client.n8n_client import N8nClient
from core.builder import WorkflowBuilder
from core.config import Settings
from core.specs import WorkflowSpec
from core.validator import validate_workflow
from mcp_server.utils import get_workflow_by_identifier, workflow_id_or_raise
app = FastAPI(title="n8n-mcp")
_settings = Settings.load_from_env()
_builder = WorkflowBuilder()
class CreateWorkflowRequest(BaseModel):
    spec: WorkflowSpec
    dry_run: bool = True
    commit: bool = False
    activate: bool = False
class UpdateWorkflowRequest(BaseModel):
    identifier: str
    patch: Dict[str, Any]
class ExecuteWorkflowRequest(BaseModel):
    identifier: str
    payload: Optional[Dict[str, Any]] = None
def _client() -> N8nClient:
    return N8nClient(_settings)
@asynccontextmanager
async def n8n_client_manager() -> AsyncIterator[N8nClient]:
    """FastAPI dependency for managing N8nClient lifecycle."""
    client = _client()
    try:
        yield client
    finally:
        await client.close()
def _workflow_id(workflow: Dict[str, Any]) -> Union[str, int]:
    """Legacy wrapper for backward compatibility."""
    return workflow_id_or_raise(
        workflow, 
        lambda msg: HTTPException(status_code=502, detail=msg)
    )
@app.get("/health")
async def health(client: N8nClient = Depends(n8n_client_manager)) -> Dict[str, Any]:
    info = await client.health()
    return {"status": "ok", "info": info}
@app.post("/tools/create_workflow")
async def create_workflow(
    req: CreateWorkflowRequest,
    client: N8nClient = Depends(n8n_client_manager)
) -> Dict[str, Any]:
    spec = req.spec
    errors = validate_workflow(spec)
    if errors:
        raise HTTPException(status_code=400, detail=errors)
    workflow_json = _builder.build(spec)
    if req.dry_run and not req.commit:
        return {"validated": True, "workflow": workflow_json}
    existing = await client.list_workflows()
    existing_names = [wf.get("name") for wf in existing]
    if spec.name in existing_names:
        raise HTTPException(status_code=409, detail="workflow name already exists")
    created = await client.create_workflow(workflow_json)
    if req.activate:
        await client.set_activation(_workflow_id(created), True)
    return {"validated": True, "workflow": created}
@app.post("/tools/update_workflow")
async def update_workflow(
    req: UpdateWorkflowRequest,
    client: N8nClient = Depends(n8n_client_manager)
) -> Dict[str, Any]:
    workflow = await get_workflow_by_identifier(client, req.identifier)
    updated = await client.update_workflow(_workflow_id(workflow), req.patch)
    return {"updated": updated}
@app.get("/tools/list_workflows")
async def list_workflows(client: N8nClient = Depends(n8n_client_manager)) -> Dict[str, Any]:
    workflows = await client.list_workflows()
    return {"workflows": workflows}
@app.get("/tools/get_workflow/{identifier}")
async def get_workflow(
    identifier: str,
    client: N8nClient = Depends(n8n_client_manager)
) -> Dict[str, Any]:
    workflow = await get_workflow_by_identifier(client, identifier)
    full = await client.get_workflow(_workflow_id(workflow))
    return {"workflow": full}
@app.post("/tools/execute_workflow")
async def execute_workflow(
    req: ExecuteWorkflowRequest,
    client: N8nClient = Depends(n8n_client_manager)
) -> Dict[str, Any]:
    workflow = await get_workflow_by_identifier(client, req.identifier)
    result = await client.execute_workflow(_workflow_id(workflow), req.payload or {})
    return {"result": result}