MCP Gateway for RFK Jr Endpoints

import os import sys import yaml import httpx import asyncio from typing import Dict, Any, Optional, List from fastmcp import FastMCP, Context from mcp.server.sse import SseServerTransport from pydantic import BaseModel, Field from dotenv import load_dotenv def log(message): with open("/tmp/mcp.log", "a") as log_file: log_file.write(f"{message}\n") log_file.flush() if os.environ.get("FASTMCP_HTTP") == "1": print(message) else: sys.stderr.write(f"{message}\n") sys.stderr.flush() # Load environment variables from .env file load_dotenv() # Load the YAML configuration file CONFIG_PATH = os.environ.get( "CONFIG_PATH", "/Users/gregory/g/projects/llm/roma/rfk_jr/rfkjr.yaml") config = {} with open(CONFIG_PATH, "r") as file: log(f"Loading config from {CONFIG_PATH}") config = yaml.safe_load(file) log(f"Config loaded: {config}") # Create the MCP server mcp = FastMCP( "RFK Jr Bridge", description="Bridge between MCP and RFK Jr endpoints", dependencies=[ "pyyaml", "httpx", "pydantic", ], ) mcp.settings.debug = True mcp.settings.log_level = "DEBUG" # Dictionary to store OpenAPI schemas for each endpoint endpoint_schemas = {} async def fetch_openapi_schema(url: str) -> Dict[str, Any]: """Fetch the OpenAPI schema from an endpoint.""" async with httpx.AsyncClient() as client: response = await client.get(f"{url}/openapi.json") return response.json() async def fetch_prompt(url: str) -> Optional[str]: """Fetch the prompt from an endpoint if available.""" try: async with httpx.AsyncClient() as client: response = await client.get(f"{url}/__prompt__") if response.status_code == 200: return response.text return None except Exception: return None async def initialize_endpoints(): """Initialize the endpoints by fetching their OpenAPI schemas.""" for endpoint_name, endpoint_config in config.items(): if endpoint_config.get("url") == "canvas" or endpoint_config.get("url", "").startswith("#"): continue url = endpoint_config["url"] try: endpoint_schemas[endpoint_name] = await fetch_openapi_schema(url) # Register tools for each endpoint register_endpoint_tools( endpoint_name, url, endpoint_schemas[endpoint_name]) # Register prompt if available prompt = await fetch_prompt(url) if prompt: register_endpoint_prompt(endpoint_name, prompt) except Exception as e: log(f"Error initializing endpoint {endpoint_name}: {e}") log(f"Initialized endpoints: {set(endpoint_schemas.keys())}") def register_endpoint_tools(endpoint_name: str, url: str, schema: Dict[str, Any]): """Register tools for an endpoint based on its OpenAPI schema.""" for path, path_item in schema.get("paths", {}).items(): for method, operation in path_item.items(): # Check if this operation is marked for CPM if "CPM" in operation or "x-CPM" in operation: operation_id = operation.get("operationId") if not operation_id: continue # Create a tool for this operation tool_name = f"{endpoint_name}_{operation_id}" # Extract parameters parameters = [] if "parameters" in operation: parameters = operation["parameters"] # Extract request body schema if present request_body_schema = None if "requestBody" in operation: content_type = next( iter(operation["requestBody"].get("content", {})), None) if content_type: request_body_schema = operation["requestBody"]["content"][content_type].get( "schema") # Register the tool register_tool(tool_name, endpoint_name, url, path, method, operation, parameters, request_body_schema) def register_tool( tool_name: str, endpoint_name: str, url: str, path: str, method: str, operation: Dict[str, Any], parameters: List[Dict[str, Any]], request_body_schema: Optional[Dict[str, Any]] ): """Register a tool for a specific endpoint operation.""" # Create a dynamic tool function async def tool_function(query): async with httpx.AsyncClient() as client: # Handle path parameters formatted_path = path # Prepare request request_url = f"{url}{formatted_path}" # Check if the request expects form-encoded data is_form_encoded = False if request_body_schema and "content" in operation.get("requestBody", {}): content_types = operation["requestBody"]["content"].keys() is_form_encoded = any("form" in ct.lower() for ct in content_types) if method.lower() == "get": # For GET requests, use query parameters response = await client.get(request_url, params=query) elif method.lower() == "post": if is_form_encoded: # For POST requests with form-encoded data log(f"Sending form data: {query}") response = await client.post(request_url, data=query) else: # For POST requests with JSON body response = await client.post(request_url, json=query) else: # Handle other methods as needed response = await client.request(method, request_url, json=query) # Return the response return response.text # Set function metadata tool_function.__name__ = tool_name operation_id = operation.get("operationId", "unknown") tool_function.__doc__ = operation.get( "description", f"Call {endpoint_name} {operation_id}") # Register the tool with FastMCP mcp.tool()(tool_function) def register_endpoint_prompt(endpoint_name: str, prompt_text: str): """Register a prompt for an endpoint.""" @mcp.prompt() def endpoint_prompt() -> str: return prompt_text # Set function name endpoint_prompt.__name__ = f"{endpoint_name}_prompt" # Add a resource to get the list of available endpoints @mcp.resource("endpoints://list") def get_endpoints() -> str: """Get a list of available endpoints.""" result = [] for endpoint_name, endpoint_config in config.items(): if endpoint_config.get("url") != "canvas" and not endpoint_config.get("url", "").startswith("#"): result.append(f"{endpoint_name}: {endpoint_config['url']}") return "\n".join(result) # Add a tool to get information about a specific endpoint @mcp.tool() def get_endpoint_info(endpoint_name: str) -> str: """Get information about a specific endpoint.""" if endpoint_name not in config: return f"Endpoint {endpoint_name} not found." endpoint_config = config[endpoint_name] if endpoint_config.get("url") == "canvas" or endpoint_config.get("url", "").startswith("#"): return f"Endpoint {endpoint_name} is not a standard API endpoint." schema = endpoint_schemas.get(endpoint_name) if not schema: return f"Schema for endpoint {endpoint_name} not available." # Extract and format endpoint information info = schema.get("info", {}) paths = schema.get("paths", {}) result = [ f"Endpoint: {endpoint_name}", f"URL: {endpoint_config['url']}", f"Title: {info.get('title', 'N/A')}", f"Description: {info.get('description', 'N/A')}", f"Version: {info.get('version', 'N/A')}", "\nAvailable Operations:" ] for path, path_item in paths.items(): for method, operation in path_item.items(): if "CPM" in operation or "x-CPM" in operation: result.append( f" - {method.upper()} {path}: {operation.get('summary', 'N/A')}") return "\n".join(result) def main(): log("Hello, world!") asyncio.run(initialize_endpoints()) if os.environ.get("FASTMCP_HTTP") == "1": log("Running in HTTP mode") port = int(os.environ.get("PORT", 10000)) mcp.settings.port = port # uvicorn.run(app, host="0.0.0.0", port=port) asyncio.run(mcp.run(transport="sse")) else: # Run in normal mode (stdio) log("Running in stdio mode") asyncio.run(mcp.run()) log(f"Imported as {__name__}") if __name__ == "__main__": main() else: log("Running with inspector") # Use asyncio.run() to properly await the coroutines asyncio.run(initialize_endpoints()) asyncio.run(mcp.run())