Skip to main content
Glama

MCP Aggregator Server

aggregator_server.py20.3 kB
""" MCP Aggregator Server ===================== Unified MCP interface that proxies requests to multiple backend MCP servers. Architecture: - Aggregator Server (Port 8003) - This server - ZepAI Memory Server (Port 8002) - Knowledge Graph + Conversation Memory - LTM Vector Server (Port 8000) - Vector Database + Code Indexing Current Status: ZepAI Memory Server (8002) - READY (4 tools) LTM Vector Server (8000) - READY (6 tools) Total: 12 tools available (consolidated from 17) The Aggregator exposes all tools from both servers as a single MCP interface. Clients connect to the Aggregator and can access all tools transparently. """ from fastmcp import FastMCP import sys import os import uvicorn import logging import asyncio from typing import Dict, Any, Optional from fastapi.middleware.cors import CORSMiddleware from fastapi import FastAPI, Request, status from fastapi.responses import JSONResponse from fastapi.exceptions import RequestValidationError from starlette.exceptions import HTTPException as StarletteHTTPException # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Fix Windows console encoding if sys.platform == 'win32': sys.stdout.reconfigure(encoding='utf-8') # Import config and clients from config import config from mcp_client import get_clients, close_clients # ============================================================================ # CREATE MCP SERVER # ============================================================================ mcp = FastMCP(name=config.AGGREGATOR_NAME) logger.info(f"Created MCP server: {mcp.name}") logger.info("Status: ZepAI (8002) ✅ | LTM (8000) ✅ | Total: 12 tools (consolidated)") # ============================================================================ # HEALTH & STATUS TOOLS # ============================================================================ @mcp.tool() async def health_check() -> Dict[str, Any]: """ Check health status of all connected servers Returns: Dictionary with health status of each server """ clients = await get_clients() health_status = await clients.health_check_all() return { "status": "healthy" if all(health_status.values()) else "degraded", "servers": health_status, "aggregator": "healthy" } @mcp.tool() async def get_server_info() -> Dict[str, Any]: """ Get information about all connected servers and available tools Returns: Dictionary with server information and available endpoints """ clients = await get_clients() schemas = await clients.get_all_schemas() info = { "aggregator": { "name": config.AGGREGATOR_NAME, "version": config.AGGREGATOR_VERSION, "url": f"http://{config.AGGREGATOR_HOST}:{config.AGGREGATOR_PORT}", }, "connected_servers": {} } # ZepAI Memory Server info if schemas.get("memory_server"): memory_schema = schemas["memory_server"] info["connected_servers"]["zepai_memory_server"] = { "url": config.MEMORY_SERVER_URL, "title": memory_schema.get("info", {}).get("title", "ZepAI Memory Layer"), "version": memory_schema.get("info", {}).get("version", "Unknown"), "endpoints_count": len(memory_schema.get("paths", {})), "tools": 4, "description": "Knowledge Graph + Conversation Memory (consolidated)" } # LTM Vector Server info if schemas.get("ltm_server"): ltm_schema = schemas["ltm_server"] info["connected_servers"]["ltm_vector_server"] = { "url": config.LTM_SERVER_URL, "title": ltm_schema.get("info", {}).get("title", "LTM Vector Database"), "version": ltm_schema.get("info", {}).get("version", "Unknown"), "endpoints_count": len(ltm_schema.get("paths", {})), "tools": 6, "description": "Vector Database + Code Indexing (consolidated)" } return info # ============================================================================ # MEMORY SERVER TOOLS (Proxy to Port 8002) # ============================================================================ @mcp.tool() async def memory_search( query: str, project_id: Optional[str] = None, limit: int = 10, use_llm_classification: bool = False ) -> Dict[str, Any]: """ Search in memory knowledge graph Args: query: Search query string project_id: Project ID (optional) limit: Maximum number of results use_llm_classification: Use LLM for classification Returns: Search results from memory server """ clients = await get_clients() payload = { "query": query, "limit": limit, "use_llm_classification": use_llm_classification } if project_id: payload["project_id"] = project_id return await clients.memory_client.proxy_request( "POST", "/search", json_data=payload, retries=config.MAX_RETRIES ) @mcp.tool() async def memory_search_code( query: str, project_id: Optional[str] = None, limit: int = 10 ) -> Dict[str, Any]: """ Search code memories with filters Args: query: Search query string project_id: Project ID (optional) limit: Maximum number of results Returns: Code search results from memory server """ clients = await get_clients() payload = { "query": query, "limit": limit } if project_id: payload["project_id"] = project_id return await clients.memory_client.proxy_request( "POST", "/search/code", json_data=payload, retries=config.MAX_RETRIES ) @mcp.tool() async def memory_ingest( content: str | Dict[str, Any], content_type: str = "text", project_id: Optional[str] = None, language: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Universal ingest tool for all content types Args: content: Content to ingest (string for text/code, dict for json/conversation) content_type: Type of content - "text", "code", "json", or "conversation" project_id: Project ID (optional) language: Programming language (required for code type) metadata: Additional metadata (optional) Returns: Ingestion result from memory server Examples: # Ingest text memory_ingest("Hello world", "text", project_id="proj1") # Ingest code memory_ingest("def hello(): pass", "code", language="python") # Ingest JSON memory_ingest({"key": "value"}, "json") # Ingest conversation memory_ingest({"messages": [...]}, "conversation") """ clients = await get_clients() # Endpoint mapping endpoint_map = { "text": "/ingest/text", "code": "/ingest/code", "json": "/ingest/json", "conversation": "/conversation/ingest" } if content_type not in endpoint_map: return { "error": f"Invalid content_type: {content_type}", "valid_types": list(endpoint_map.keys()) } # Build payload based on content type if content_type == "text": payload = { "text": content, "metadata": metadata or {} } elif content_type == "code": payload = { "code": content, "metadata": metadata or {} } if language: payload["language"] = language elif content_type == "json": payload = { "data": content, "metadata": metadata or {} } elif content_type == "conversation": payload = content.copy() if isinstance(content, dict) else {} if project_id: payload["project_id"] = project_id return await clients.memory_client.proxy_request( "POST", endpoint_map[content_type], json_data=payload, retries=config.MAX_RETRIES ) @mcp.tool() async def memory_stats( stats_type: str = "project", project_id: Optional[str] = None ) -> Dict[str, Any]: """ Get statistics from memory server Args: stats_type: Type of stats - "project" or "cache" project_id: Project ID (required for project stats) Returns: Statistics from memory server Examples: # Get project stats memory_stats("project", project_id="proj1") # Get cache stats memory_stats("cache") """ clients = await get_clients() if stats_type == "project": if not project_id: return {"error": "project_id is required for project stats"} return await clients.memory_client.proxy_request( "GET", f"/stats/{project_id}", retries=config.MAX_RETRIES ) elif stats_type == "cache": return await clients.memory_client.proxy_request( "GET", "/cache/stats", retries=config.MAX_RETRIES ) else: return { "error": f"Invalid stats_type: {stats_type}", "valid_types": ["project", "cache"] } # ============================================================================ # LTM VECTOR DB TOOLS (Proxy to Port 8000) # ============================================================================ @mcp.tool() async def ltm_process_repo(repo_path: str) -> Dict[str, Any]: """ Process repository for vector indexing Args: repo_path: Path to repository to process Returns: Processing result from LTM server """ clients = await get_clients() return await clients.ltm_client.proxy_request( "POST", "/repos/process", params={"repo_path": repo_path}, retries=config.MAX_RETRIES ) @mcp.tool() async def ltm_query_vector(query: str, top_k: int = 10) -> Dict[str, Any]: """ Query vector database for semantic code search Args: query: Search query string top_k: Number of top results to return Returns: Vector search results from LTM server """ clients = await get_clients() return await clients.ltm_client.proxy_request( "GET", "/vectors/query", params={"query": query, "top_k": top_k}, retries=config.MAX_RETRIES ) @mcp.tool() async def ltm_search_file(filepath: str) -> Dict[str, Any]: """ Search for specific file in vector database Args: filepath: Path to file to search Returns: File search results from LTM server """ clients = await get_clients() return await clients.ltm_client.proxy_request( "GET", "/vectors/files", params={"filepath": filepath}, retries=config.MAX_RETRIES ) @mcp.tool() async def ltm_add_file(filepath: str) -> Dict[str, Any]: """ Add file to vector database Args: filepath: Path to file to add Returns: Addition result from LTM server """ clients = await get_clients() return await clients.ltm_client.proxy_request( "POST", "/vectors/files", params={"filepath": filepath}, retries=config.MAX_RETRIES ) @mcp.tool() async def ltm_delete( filepath: Optional[str] = None, uuids: Optional[list] = None ) -> Dict[str, Any]: """ Delete from vector database by filepath or UUIDs Args: filepath: Path to file to delete (optional) uuids: List of UUIDs to delete (optional) Returns: Deletion result from LTM server Examples: # Delete by filepath ltm_delete(filepath="/path/to/file.py") # Delete by UUIDs ltm_delete(uuids=["uuid1", "uuid2"]) Note: Must provide either filepath or uuids, not both """ clients = await get_clients() if filepath and uuids: return {"error": "Provide either filepath or uuids, not both"} if filepath: return await clients.ltm_client.proxy_request( "DELETE", "/vectors/filepath", params={"filepath": filepath}, retries=config.MAX_RETRIES ) elif uuids: return await clients.ltm_client.proxy_request( "DELETE", "/vectors/uuids", json_data=uuids, retries=config.MAX_RETRIES ) else: return {"error": "Must provide either filepath or uuids"} @mcp.tool() async def ltm_chunk_file(file_path: str) -> Dict[str, Any]: """ Chunk file using AST-based chunking Args: file_path: Path to file to chunk Returns: Chunking result from LTM server """ clients = await get_clients() return await clients.ltm_client.proxy_request( "POST", "/files/chunks", params={"file_path": file_path}, retries=config.MAX_RETRIES ) # ============================================================================ # HTTP SERVER SETUP - SAME PATTERN AS FASTMCP_SERVER # ============================================================================ def create_http_app(): """ Create HTTP app using COMBINED ROUTES pattern from FastMCP docs Reference: https://gofastmcp.com/integrations/fastapi#offering-an-llm-friendly-api This pattern combines MCP routes and custom routes into a single app. SAME as fastmcp_server/server_http.py implementation. """ # Create the MCP's ASGI app with '/mcp' path mcp_app = mcp.http_app(path='/mcp') # Create a new FastAPI app that combines MCP routes combined_app = FastAPI( title=config.AGGREGATOR_NAME, description="Unified MCP Aggregator - Proxies to multiple MCP servers", version=config.AGGREGATOR_VERSION, routes=[*mcp_app.routes], # MCP routes at /mcp/* lifespan=mcp_app.lifespan, # Important: use MCP lifespan ) # ======================================================================== # ADD ROOT ENDPOINT # ======================================================================== @combined_app.get("/", operation_id="root") async def root(): """Root endpoint - Server information""" return { "status": "ok", "service": "MCP Aggregator", "version": config.AGGREGATOR_VERSION, "name": config.AGGREGATOR_NAME, "endpoints": { "mcp": "/mcp", "openapi_docs": "/docs", "openapi_schema": "/openapi.json", }, "connected_servers": { "zepai_memory_server": config.MEMORY_SERVER_URL, "ltm_vector_server": config.LTM_SERVER_URL }, "tools_count": 12, "tools_breakdown": { "health": 2, "memory": 4, "vector": 6 }, "note": "Consolidated from 17 tools for better usability" } # ======================================================================== # ADD CORS MIDDLEWARE # ======================================================================== combined_app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) logger.info("CORS middleware added") # ======================================================================== # ADD EXCEPTION HANDLERS # ======================================================================== @combined_app.exception_handler(StarletteHTTPException) async def http_exception_handler(request: Request, exc: StarletteHTTPException): """Handle HTTP exceptions""" logger.error(f"HTTP {exc.status_code} error: {exc.detail} - Path: {request.url.path}") return JSONResponse( status_code=exc.status_code, content={ "error": exc.detail, "path": str(request.url.path), "status_code": exc.status_code } ) @combined_app.exception_handler(RequestValidationError) async def validation_exception_handler(request: Request, exc: RequestValidationError): """Handle Pydantic validation errors""" logger.error(f"Validation error: {exc.errors()} - Path: {request.url.path}") return JSONResponse( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content={ "error": "Validation error", "details": exc.errors(), "path": str(request.url.path) } ) @combined_app.exception_handler(Exception) async def general_exception_handler(request: Request, exc: Exception): """Handle all other unhandled exceptions""" logger.exception(f"Unhandled exception: {str(exc)} - Path: {request.url.path}") return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={ "error": "Internal server error", "message": str(exc), "path": str(request.url.path), "type": type(exc).__name__ } ) logger.info(f"Combined MCP routes") logger.info(f"Total routes: {len(combined_app.routes)}") return combined_app # ============================================================================ # MAIN ENTRY POINT # ============================================================================ if __name__ == "__main__": logger.info("=" * 70) logger.info("MCP Aggregator Server - Unified Knowledge Interface") logger.info("=" * 70) logger.info(f"Server: {config.AGGREGATOR_NAME} v{config.AGGREGATOR_VERSION}") logger.info(f"") logger.info(f"Connected Servers:") logger.info(f" ✅ ZepAI Memory Server: {config.MEMORY_SERVER_URL}") logger.info(f" - Knowledge Graph + Conversation Memory") logger.info(f" - 10 tools available") logger.info(f"") logger.info(f" ✅ LTM Vector Server: {config.LTM_SERVER_URL}") logger.info(f" - Vector Database + Code Indexing") logger.info(f" - 7 tools available") logger.info(f"") logger.info(f"HTTP Endpoints:") logger.info(f" - MCP Endpoint: http://{config.AGGREGATOR_HOST}:{config.AGGREGATOR_PORT}/mcp") logger.info(f" - OpenAPI Docs: http://{config.AGGREGATOR_HOST}:{config.AGGREGATOR_PORT}/docs") logger.info(f" - Root API: http://{config.AGGREGATOR_HOST}:{config.AGGREGATOR_PORT}/") logger.info(f"") logger.info(f"MCP Tools: 12 tools available (2 health + 4 memory + 6 vector)") logger.info(f" Note: Consolidated from 17 tools for better usability") logger.info(f"") logger.info(f"To use:") logger.info(f" 1. View docs: http://{config.AGGREGATOR_HOST}:{config.AGGREGATOR_PORT}/docs") logger.info(f" 2. Connect MCP clients to: http://{config.AGGREGATOR_HOST}:{config.AGGREGATOR_PORT}/mcp") logger.info(f" (MCP protocol handles SSE/messages automatically)") logger.info(f" 3. Run tests: python test_final.py") logger.info("=" * 70) # Create HTTP app http_app = create_http_app() logger.info(f"Starting HTTP MCP server on port {config.AGGREGATOR_PORT}...") logger.info("Press Ctrl+C to stop") # Run with uvicorn with optimized configuration - SAME AS FASTMCP_SERVER try: uvicorn.run( http_app, host=config.AGGREGATOR_HOST, port=config.AGGREGATOR_PORT, log_level="info", access_log=True, use_colors=True, limit_concurrency=100, timeout_keep_alive=5, ) except KeyboardInterrupt: logger.info("Shutting down...") asyncio.run(close_clients())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/NguyenTrinh3008/mcp_agg'

If you have feedback or need assistance with the MCP directory API, please join our Discord server