#!/usr/bin/env python3
"""
AAP Gateway Generic API Tool
"""
from typing import Any, Dict, Optional
from fastmcp import FastMCP
from pydantic import Field
from connectors.gateway_connector import get_gateway_connector
def register_gateway_generic_api_tools(mcp: FastMCP):
"""Register generic Gateway API access tools with the MCP server"""
@mcp.tool()
def gateway_generic_api(
method: str = Field(description="HTTP method: GET, POST, PATCH, DELETE"),
endpoint: str = Field(description="API endpoint path (e.g., 'services/', 'routes/', 'users/1/')"),
data: Optional[Dict[str, Any]] = Field(None, description="Request body data for POST/PATCH requests"),
params: Optional[Dict[str, Any]] = Field(None, description="Query parameters for GET requests")
) -> Dict[str, Any]:
"""
Generic Gateway API access tool.
Provides direct access to any AAP Gateway API endpoint with proper authentication.
Examples:
- GET /services/: method="GET", endpoint="services/"
- GET /routes/1/: method="GET", endpoint="routes/1/"
- POST /teams/: method="POST", endpoint="teams/", data={"name": "New Team", "organization": 1}
- GET /users/: method="GET", endpoint="users/", params={"search": "admin"}
"""
try:
client = get_gateway_connector()
method = method.upper()
if method == "GET":
return client.get(endpoint, params)
elif method == "POST":
return client.post(endpoint, data)
elif method == "PATCH":
return client.patch(endpoint, data or {})
elif method == "DELETE":
return client.delete(endpoint)
else:
return {"error": f"Unsupported HTTP method: {method}"}
except Exception as e:
return {"error": f"Gateway API request failed: {str(e)}"}