mcp_server.py•6.13 kB
#!/usr/bin/env python3
import os
import httpx
from typing import Dict, Any, Optional, List
import asyncio
from mcp.server.fastmcp import FastMCP
# Initialize the MCP server
mcp = FastMCP("Rossum MCP Server")
# Rossum API configuration
ROSSUM_API_BASE = os.getenv("ROSSUM_API_BASE", "https://api.elis.rossum.ai/v1")
ROSSUM_API_KEY = os.getenv("ROSSUM_API_KEY")
if not ROSSUM_API_KEY:
raise ValueError("ROSSUM_API_KEY environment variable must be set")
# Reusable HTTPX client
client = httpx.AsyncClient()
async def get_rossum_headers() -> Dict[str, str]:
return {
"Authorization": f"Token {ROSSUM_API_KEY}",
"Content-Type": "application/json"
}
async def _rossum_request(method: str, path: str, **kwargs) -> Any:
"""Make a request to the Rossum API"""
try:
response = await client.request(
method=method,
url=f"{ROSSUM_API_BASE}{path}",
headers=await get_rossum_headers(),
**kwargs
)
response.raise_for_status()
# Handle cases where Rossum might return empty body on success (e.g., 204)
if response.status_code == 204:
return None
return response.json()
except httpx.HTTPStatusError as e:
# Get error detail from response if possible
error_detail = str(e)
try:
error_detail = e.response.json()
except Exception:
pass
raise Exception(f"Rossum API error {e.response.status_code}: {error_detail}")
except httpx.RequestError as e:
raise Exception(f"Rossum API request failed: {str(e)}")
# --- Tool Implementation Functions ---
async def _rossum_unpaginated_request(method: str, path: str, summary_keys: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""Make a request to the Rossum API, handling pagination and summarization."""
all_items = []
current_path = path
while current_path:
try:
page_data = await _rossum_request(method, current_path)
except Exception as e:
# Log or handle error, for now re-raising.
# Consider logging: print(f"Error fetching Rossum data from {current_path}: {e}")
raise
if page_data and "results" in page_data and isinstance(page_data.get("results"), list):
for item in page_data["results"]:
if summary_keys:
summary_item = {key: item.get(key) for key in summary_keys if key in item}
all_items.append(summary_item)
else:
all_items.append(item) # Append the full item if no summary_keys
pagination_info = page_data.get("pagination") if page_data else None
next_page_url = pagination_info.get("next") if pagination_info else None
if next_page_url:
if next_page_url.startswith(ROSSUM_API_BASE):
current_path = next_page_url[len(ROSSUM_API_BASE):]
else:
# Consider logging: print(f"Warning: next_page_url {next_page_url} does not match ROSSUM_API_BASE")
current_path = None
else:
current_path = None
return all_items
async def _get_queues_impl() -> List[Dict[str, Any]]:
"""Get all queues with essential fields only, handling pagination."""
summary_keys = [
"id",
"name",
"url",
"schema",
"workspace",
"status",
]
return await _rossum_unpaginated_request("GET", "/queues", summary_keys=summary_keys)
async def _get_schema_impl(schema_id: str):
"""Get a specific schema by ID"""
return await _rossum_request("GET", f"/schemas/{schema_id}")
async def _get_hooks_impl() -> List[Dict[str, Any]]:
"""Get all hooks with essential fields only, handling pagination."""
summary_keys = ["id", "name", "type", "url", "active", "events", "queues"]
return await _rossum_unpaginated_request("GET", "/hooks", summary_keys=summary_keys)
async def _get_hook_impl(hook_id: str):
"""Get a specific hook by ID including source code"""
return await _rossum_request("GET", f"/hooks/{hook_id}")
async def _get_workspaces_impl() -> List[Dict[str, Any]]:
"""Get all workspaces, handling pagination."""
# No summary_keys provided, so it will fetch all fields for each workspace.
return await _rossum_unpaginated_request("GET", "/workspaces")
async def _get_workspace_impl(workspace_id: str):
"""Get a specific workspace by ID"""
return await _rossum_request("GET", f"/workspaces/{workspace_id}")
# --- MCP Tool Definitions ---
@mcp.tool()
async def rossum_get_queues() -> List[Dict[str, Any]]:
"""Get all queues from the Rossum organization."""
return await _get_queues_impl()
@mcp.tool()
async def rossum_get_schema(schema_id: str) -> Dict[str, Any]:
"""Get a specific schema by its ID.
Args:
schema_id: The ID of the schema to retrieve
"""
return await _get_schema_impl(schema_id=schema_id)
@mcp.tool()
async def rossum_get_hooks() -> List[Dict[str, Any]]:
"""Get all serverless function hooks from the Rossum organization."""
return await _get_hooks_impl()
@mcp.tool()
async def rossum_get_hook(hook_id: str) -> Dict[str, Any]:
"""Get a specific serverless function hook by its ID, including its source code.
Args:
hook_id: The ID of the hook to retrieve
"""
return await _get_hook_impl(hook_id=hook_id)
@mcp.tool()
async def rossum_get_workspaces() -> List[Dict[str, Any]]:
"""Get all workspaces from the Rossum organization."""
return await _get_workspaces_impl()
@mcp.tool()
async def rossum_get_workspace(workspace_id: str) -> Dict[str, Any]:
"""Get a specific workspace by its ID.
Args:
workspace_id: The ID of the workspace to retrieve
"""
return await _get_workspace_impl(workspace_id=workspace_id)
async def cleanup():
"""Close the HTTP client on shutdown"""
await client.aclose()
if __name__ == "__main__":
import atexit
atexit.register(lambda: asyncio.run(cleanup()))
mcp.run()