n8n_client.py•2.55 kB
from __future__ import annotations
import httpx
from typing import Any, Dict, List, Union, cast
from core.config import Settings
class N8nClient:
def __init__(self, settings: Settings) -> None:
self._base_url = f"{settings.n8n_api_url}/api/v1"
self._headers = {"X-N8N-API-KEY": settings.n8n_api_key}
self._client = httpx.AsyncClient(
base_url=self._base_url,
headers=self._headers,
timeout=30.0,
)
async def close(self) -> None:
await self._client.aclose()
async def health(self) -> Dict[str, Any]:
resp = await self._client.get("/health")
resp.raise_for_status()
return cast(Dict[str, Any], resp.json())
async def list_workflows(self) -> List[Dict[str, Any]]:
resp = await self._client.get("/workflows")
resp.raise_for_status()
data = cast(Dict[str, Any], resp.json())
raw = data.get("data", [])
return [cast(Dict[str, Any], item) for item in raw]
async def get_workflow(self, workflow_id: Union[str, int]) -> Dict[str, Any]:
resp = await self._client.get(f"/workflows/{workflow_id}")
resp.raise_for_status()
payload = cast(Dict[str, Any], resp.json())
inner = payload.get("data")
if isinstance(inner, dict):
return inner
return payload
async def create_workflow(self, workflow_json: Dict[str, Any]) -> Dict[str, Any]:
resp = await self._client.post("/workflows", json=workflow_json)
resp.raise_for_status()
return cast(Dict[str, Any], resp.json())
async def update_workflow(
self, workflow_id: Union[str, int], patch: Dict[str, Any]
) -> Dict[str, Any]:
resp = await self._client.patch(f"/workflows/{workflow_id}", json=patch)
resp.raise_for_status()
return cast(Dict[str, Any], resp.json())
async def set_activation(
self, workflow_id: Union[str, int], active: bool
) -> Dict[str, Any]:
endpoint = "activate" if active else "deactivate"
resp = await self._client.post(f"/workflows/{workflow_id}/{endpoint}")
resp.raise_for_status()
return cast(Dict[str, Any], resp.json())
async def execute_workflow(
self, workflow_id: Union[str, int], payload: Dict[str, Any]
) -> Dict[str, Any]:
resp = await self._client.post(
f"/workflows/{workflow_id}/run",
json=payload,
)
resp.raise_for_status()
return cast(Dict[str, Any], resp.json())