Skip to main content
Glama

MCP Orchestration Server

embedded_mcp_server.py17.3 kB
#!/usr/bin/env python3 """ Embedded MCP Server - Generated by Ultimate Connector """ import os import sys import logging import asyncio import importlib.util from datetime import datetime from pathlib import Path from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from pydantic import BaseModel from dotenv import load_dotenv # MongoDB integration try: from pymongo import MongoClient from mcp_mongodb_integration import MCPMongoDBIntegration MONGODB_AVAILABLE = True except ImportError: MONGODB_AVAILABLE = False print("⚠️ MongoDB integration not available") # Inter-agent communication try: from inter_agent_communication import initialize_inter_agent_system, AgentCommunicationHub INTER_AGENT_AVAILABLE = True except ImportError: INTER_AGENT_AVAILABLE = False print("⚠️ Inter-agent communication not available") load_dotenv() # Add project paths sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent / "agents")) logging.basicConfig(level=logging.INFO) logger = logging.getLogger("embedded_mcp_server") app = FastAPI(title="Embedded MCP Server", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class MCPCommandRequest(BaseModel): command: str # Global state loaded_agents = {} server_ready = False mongodb_integration = None inter_agent_hub = None AGENT_CONFIGS = {'math_agent': {'path': 'agents/specialized/math_agent.py', 'class_name': 'MathAgent', 'keywords': ['calculate', 'compute', 'math', '+', '-', '*', '/', '%', 'percent'], 'test_command': 'Calculate 20% of 500'}, 'document_agent': {'path': 'agents/core/document_processor.py', 'class_name': 'DocumentProcessorAgent', 'keywords': ['analyze', 'document', 'text', 'process'], 'test_command': 'Analyze this text: Hello world'}, 'gmail_agent': {'path': 'agents/communication/real_gmail_agent.py', 'class_name': 'RealGmailAgent', 'keywords': ['email', 'send', 'mail', '@'], 'test_command': 'Send email to test@example.com'}, 'calendar_agent': {'path': 'agents/specialized/calendar_agent.py', 'class_name': 'CalendarAgent', 'keywords': ['remind', 'reminder', 'schedule', 'calendar', 'meeting'], 'test_command': 'Create reminder for tomorrow'}, 'weather_agent': {'path': 'agents/data/realtime_weather_agent.py', 'class_name': 'RealTimeWeatherAgent', 'keywords': ['weather', 'temperature', 'temp', 'forecast', 'climate'], 'test_command': 'What is the weather in Mumbai?'}} def _requires_multi_agent(command: str) -> bool: """Check if command requires multi-agent coordination.""" command_lower = command.lower() # Multi-agent indicators multi_agent_keywords = [ "based on", "and", "calculate.*weather", "weather.*calculate", "cost.*heating", "heating.*cost", "analysis.*weather", "weather.*analysis", "forecast.*math", "math.*forecast", "document.*weather", "weather.*document" ] # Count how many different agent types are needed agent_indicators = { "math": ["calculate", "compute", "cost", "percentage", "analysis"], "weather": ["weather", "temperature", "forecast", "climate", "heating", "mumbai"], "document": ["document", "text", "analyze", "process"] } triggered_agents = 0 for agent_type, keywords in agent_indicators.items(): if any(keyword in command_lower for keyword in keywords): triggered_agents += 1 return triggered_agents > 1 or any(keyword in command_lower for keyword in multi_agent_keywords) async def load_agent(agent_id: str, config: dict): """Load agent.""" try: agent_path = Path(config["path"]) if not agent_path.exists(): return None spec = importlib.util.spec_from_file_location(agent_id, agent_path) if spec is None or spec.loader is None: return None module = importlib.util.module_from_spec(spec) sys.modules[agent_id] = module spec.loader.exec_module(module) agent_class = getattr(module, config["class_name"], None) if agent_class is None: return None return agent_class() except: return None @app.on_event("startup") async def startup_event(): """Initialize server.""" global loaded_agents, server_ready, mongodb_integration, inter_agent_hub logger.info("🚀 Starting Embedded MCP Server with Inter-Agent Communication...") # Initialize MongoDB integration if MONGODB_AVAILABLE: try: mongodb_integration = MCPMongoDBIntegration() connected = await mongodb_integration.connect() if connected: logger.info("✅ MongoDB integration connected") else: logger.warning("⚠️ MongoDB integration failed") except Exception as e: logger.error(f"❌ MongoDB integration error: {e}") # Initialize Inter-Agent Communication if INTER_AGENT_AVAILABLE: try: inter_agent_hub = await initialize_inter_agent_system() logger.info("✅ Inter-agent communication system initialized") except Exception as e: logger.error(f"❌ Inter-agent communication error: {e}") for agent_id, config in AGENT_CONFIGS.items(): agent = await load_agent(agent_id, config) if agent: loaded_agents[agent_id] = { "instance": agent, "config": config, "status": "loaded" } logger.info(f"✅ Loaded {agent_id}") server_ready = True logger.info(f"🎉 Server ready with {len(loaded_agents)} agents and inter-communication") @app.get("/api/health") async def health_check(): """Health check.""" inter_agent_status = None if inter_agent_hub: inter_agent_status = inter_agent_hub.get_system_status() return { "status": "ok", "server": "embedded_mcp_server", "ready": server_ready, "agents_loaded": len(loaded_agents), "available_agents": list(loaded_agents.keys()), "mongodb_connected": mongodb_integration is not None, "inter_agent_communication": inter_agent_hub is not None, "inter_agent_status": inter_agent_status, "timestamp": datetime.now().isoformat() } @app.get("/", response_class=HTMLResponse) async def serve_interface(): """Main interface.""" return HTMLResponse(""" <!DOCTYPE html> <html> <head> <title>MCP System - All Agents Connected</title> <style> body { font-family: Arial, sans-serif; margin: 40px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; } .container { max-width: 900px; margin: 0 auto; background: rgba(255,255,255,0.1); padding: 30px; border-radius: 15px; } h1 { text-align: center; margin-bottom: 20px; } .agent { background: rgba(255,255,255,0.1); padding: 15px; margin: 10px 0; border-radius: 8px; } .btn { background: #4CAF50; color: white; padding: 10px 20px; border: none; border-radius: 5px; text-decoration: none; display: inline-block; margin: 5px; } .status { color: #4CAF50; font-weight: bold; } </style> </head> <body> <div class="container"> <h1>🤖 MCP System - All Agents Connected</h1> <p style="text-align: center; font-size: 1.2em;" class="status">✅ System Operational</p> <div class="agent"> <h3>🔢 Math Agent</h3> <p><strong>Example:</strong> Calculate 20% of 500</p> <p><strong>Capabilities:</strong> Mathematical calculations, percentages, formulas</p> </div> <div class="agent"> <h3>📄 Document Agent</h3> <p><strong>Example:</strong> Analyze this text: Hello world</p> <p><strong>Capabilities:</strong> Text analysis, document processing, summarization</p> </div> <div class="agent"> <h3>📧 Gmail Agent</h3> <p><strong>Example:</strong> Send email to test@example.com</p> <p><strong>Capabilities:</strong> Email automation, notifications, communication</p> </div> <div class="agent"> <h3>📅 Calendar Agent</h3> <p><strong>Example:</strong> Create reminder for tomorrow</p> <p><strong>Capabilities:</strong> Scheduling, reminders, time management</p> </div> <div class="agent"> <h3>🌤️ Weather Agent</h3> <p><strong>Example:</strong> What is the weather in Mumbai?</p> <p><strong>Capabilities:</strong> Live weather data, forecasts, climate information</p> </div> <div style="text-align: center; margin-top: 30px;"> <a href="/docs" class="btn">📚 API Documentation</a> <a href="/api/health" class="btn">🔍 Health Check</a> </div> </div> </body> </html> """) @app.post("/api/mcp/command") async def process_command(request: MCPCommandRequest): """Process commands.""" if not server_ready: raise HTTPException(status_code=503, detail="Server not ready") try: command = request.command.lower().strip() # Find matching agent matching_agent = None agent_id = None for aid, agent_data in loaded_agents.items(): keywords = agent_data["config"]["keywords"] if any(keyword in command for keyword in keywords): matching_agent = agent_data["instance"] agent_id = aid break if not matching_agent: return { "status": "success", "message": f"Command processed but no specific agent matched", "command": request.command, "available_agents": list(loaded_agents.keys()), "timestamp": datetime.now().isoformat() } # Create message for agent from agents.base_agent import MCPMessage message = MCPMessage( id=f"{agent_id}_{datetime.now().timestamp()}", method="process", params={"query": request.command, "expression": request.command}, timestamp=datetime.now() ) # Check if this requires multi-agent coordination if inter_agent_hub and _requires_multi_agent(request.command): logger.info(f"🔗 Coordinating multi-agent task: {request.command}") coordination_result = await inter_agent_hub.coordinate_multi_agent_task(request.command, agent_id) # Store coordination result - FORCE STORAGE if mongodb_integration: try: # Primary storage mongodb_id = await mongodb_integration.store_command_result( command=request.command, agent_used="multi_agent_coordination", result=coordination_result, timestamp=datetime.now() ) coordination_result["stored_in_mongodb"] = True coordination_result["mongodb_id"] = mongodb_id logger.info(f"✅ Stored coordination result in MongoDB") except Exception as e: logger.error(f"❌ Primary coordination storage failed: {e}") # Fallback storage try: fallback_success = await mongodb_integration.force_store_result( "multi_agent_coordination", request.command, coordination_result ) coordination_result["stored_in_mongodb"] = fallback_success coordination_result["storage_method"] = "fallback" logger.info(f"✅ Fallback coordination storage successful") except Exception as e2: logger.error(f"❌ Fallback coordination storage failed: {e2}") coordination_result["stored_in_mongodb"] = False coordination_result["storage_error"] = str(e2) else: coordination_result["stored_in_mongodb"] = False coordination_result["storage_error"] = "MongoDB integration not available" return coordination_result # Process with single agent result = await matching_agent.process_message(message) # Add metadata result["agent_used"] = agent_id result["server"] = "embedded_mcp_server" result["timestamp"] = datetime.now().isoformat() # Store in MongoDB - FORCE STORAGE WITH GUARANTEED REPORTING if mongodb_integration: try: # Primary storage method mongodb_id = await mongodb_integration.store_command_result( command=request.command, agent_used=agent_id, result=result, timestamp=datetime.now() ) result["stored_in_mongodb"] = True result["mongodb_id"] = mongodb_id result["storage_method"] = "primary" logger.info(f"✅ Stored command result in MongoDB: {agent_id}") except Exception as e: logger.error(f"❌ Primary storage failed: {e}") # Fallback storage method try: fallback_success = await mongodb_integration.force_store_result( agent_id, request.command, result ) result["stored_in_mongodb"] = fallback_success result["storage_method"] = "fallback" if fallback_success: logger.info(f"✅ Fallback storage successful: {agent_id}") else: logger.error(f"❌ Fallback storage failed: {agent_id}") except Exception as e2: logger.error(f"❌ Fallback storage also failed: {e2}") result["stored_in_mongodb"] = False result["storage_error"] = str(e2) result["storage_method"] = "failed" else: result["stored_in_mongodb"] = False result["storage_error"] = "MongoDB integration not available" result["storage_method"] = "unavailable" return result except Exception as e: return { "status": "error", "message": f"Command processing failed: {str(e)}", "timestamp": datetime.now().isoformat() } @app.get("/api/agents") async def list_agents(): """List agents.""" return { "status": "success", "agents": { agent_id: { "status": agent_data["status"], "class_name": agent_data["config"]["class_name"], "keywords": agent_data["config"]["keywords"] } for agent_id, agent_data in loaded_agents.items() }, "total_agents": len(loaded_agents), "inter_agent_communication": inter_agent_hub is not None, "timestamp": datetime.now().isoformat() } @app.post("/api/mcp/coordinate") async def coordinate_agents(request: MCPCommandRequest): """Coordinate multi-agent task.""" if not server_ready: raise HTTPException(status_code=503, detail="Server not ready") if not inter_agent_hub: raise HTTPException(status_code=503, detail="Inter-agent communication not available") try: result = await inter_agent_hub.coordinate_multi_agent_task(request.command) # Store coordination result if mongodb_integration: try: await mongodb_integration.store_command_result( command=request.command, agent_used="multi_agent_coordination", result=result, timestamp=datetime.now() ) result["stored_in_mongodb"] = True except Exception as e: logger.error(f"❌ Failed to store coordination result: {e}") result["stored_in_mongodb"] = False return result except Exception as e: return { "status": "error", "message": f"Multi-agent coordination failed: {str(e)}", "timestamp": datetime.now().isoformat() } @app.get("/api/inter-agent/status") async def inter_agent_status(): """Get inter-agent communication status.""" if not inter_agent_hub: return { "status": "unavailable", "message": "Inter-agent communication not initialized" } return inter_agent_hub.get_system_status() if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Nisarg-123-web/MCP2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server