embedded_mcp_server.py•17.3 kB
#!/usr/bin/env python3
"""
Embedded MCP Server - Generated by Ultimate Connector
"""
import os
import sys
import logging
import asyncio
import importlib.util
from datetime import datetime
from pathlib import Path
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from dotenv import load_dotenv
# MongoDB integration
try:
from pymongo import MongoClient
from mcp_mongodb_integration import MCPMongoDBIntegration
MONGODB_AVAILABLE = True
except ImportError:
MONGODB_AVAILABLE = False
print("⚠️ MongoDB integration not available")
# Inter-agent communication
try:
from inter_agent_communication import initialize_inter_agent_system, AgentCommunicationHub
INTER_AGENT_AVAILABLE = True
except ImportError:
INTER_AGENT_AVAILABLE = False
print("⚠️ Inter-agent communication not available")
load_dotenv()
# Add project paths
sys.path.insert(0, str(Path(__file__).parent))
sys.path.insert(0, str(Path(__file__).parent / "agents"))
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("embedded_mcp_server")
app = FastAPI(title="Embedded MCP Server", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class MCPCommandRequest(BaseModel):
command: str
# Global state
loaded_agents = {}
server_ready = False
mongodb_integration = None
inter_agent_hub = None
AGENT_CONFIGS = {'math_agent': {'path': 'agents/specialized/math_agent.py', 'class_name': 'MathAgent', 'keywords': ['calculate', 'compute', 'math', '+', '-', '*', '/', '%', 'percent'], 'test_command': 'Calculate 20% of 500'}, 'document_agent': {'path': 'agents/core/document_processor.py', 'class_name': 'DocumentProcessorAgent', 'keywords': ['analyze', 'document', 'text', 'process'], 'test_command': 'Analyze this text: Hello world'}, 'gmail_agent': {'path': 'agents/communication/real_gmail_agent.py', 'class_name': 'RealGmailAgent', 'keywords': ['email', 'send', 'mail', '@'], 'test_command': 'Send email to test@example.com'}, 'calendar_agent': {'path': 'agents/specialized/calendar_agent.py', 'class_name': 'CalendarAgent', 'keywords': ['remind', 'reminder', 'schedule', 'calendar', 'meeting'], 'test_command': 'Create reminder for tomorrow'}, 'weather_agent': {'path': 'agents/data/realtime_weather_agent.py', 'class_name': 'RealTimeWeatherAgent', 'keywords': ['weather', 'temperature', 'temp', 'forecast', 'climate'], 'test_command': 'What is the weather in Mumbai?'}}
def _requires_multi_agent(command: str) -> bool:
"""Check if command requires multi-agent coordination."""
command_lower = command.lower()
# Multi-agent indicators
multi_agent_keywords = [
"based on", "and", "calculate.*weather", "weather.*calculate",
"cost.*heating", "heating.*cost", "analysis.*weather",
"weather.*analysis", "forecast.*math", "math.*forecast",
"document.*weather", "weather.*document"
]
# Count how many different agent types are needed
agent_indicators = {
"math": ["calculate", "compute", "cost", "percentage", "analysis"],
"weather": ["weather", "temperature", "forecast", "climate", "heating", "mumbai"],
"document": ["document", "text", "analyze", "process"]
}
triggered_agents = 0
for agent_type, keywords in agent_indicators.items():
if any(keyword in command_lower for keyword in keywords):
triggered_agents += 1
return triggered_agents > 1 or any(keyword in command_lower for keyword in multi_agent_keywords)
async def load_agent(agent_id: str, config: dict):
"""Load agent."""
try:
agent_path = Path(config["path"])
if not agent_path.exists():
return None
spec = importlib.util.spec_from_file_location(agent_id, agent_path)
if spec is None or spec.loader is None:
return None
module = importlib.util.module_from_spec(spec)
sys.modules[agent_id] = module
spec.loader.exec_module(module)
agent_class = getattr(module, config["class_name"], None)
if agent_class is None:
return None
return agent_class()
except:
return None
@app.on_event("startup")
async def startup_event():
"""Initialize server."""
global loaded_agents, server_ready, mongodb_integration, inter_agent_hub
logger.info("🚀 Starting Embedded MCP Server with Inter-Agent Communication...")
# Initialize MongoDB integration
if MONGODB_AVAILABLE:
try:
mongodb_integration = MCPMongoDBIntegration()
connected = await mongodb_integration.connect()
if connected:
logger.info("✅ MongoDB integration connected")
else:
logger.warning("⚠️ MongoDB integration failed")
except Exception as e:
logger.error(f"❌ MongoDB integration error: {e}")
# Initialize Inter-Agent Communication
if INTER_AGENT_AVAILABLE:
try:
inter_agent_hub = await initialize_inter_agent_system()
logger.info("✅ Inter-agent communication system initialized")
except Exception as e:
logger.error(f"❌ Inter-agent communication error: {e}")
for agent_id, config in AGENT_CONFIGS.items():
agent = await load_agent(agent_id, config)
if agent:
loaded_agents[agent_id] = {
"instance": agent,
"config": config,
"status": "loaded"
}
logger.info(f"✅ Loaded {agent_id}")
server_ready = True
logger.info(f"🎉 Server ready with {len(loaded_agents)} agents and inter-communication")
@app.get("/api/health")
async def health_check():
"""Health check."""
inter_agent_status = None
if inter_agent_hub:
inter_agent_status = inter_agent_hub.get_system_status()
return {
"status": "ok",
"server": "embedded_mcp_server",
"ready": server_ready,
"agents_loaded": len(loaded_agents),
"available_agents": list(loaded_agents.keys()),
"mongodb_connected": mongodb_integration is not None,
"inter_agent_communication": inter_agent_hub is not None,
"inter_agent_status": inter_agent_status,
"timestamp": datetime.now().isoformat()
}
@app.get("/", response_class=HTMLResponse)
async def serve_interface():
"""Main interface."""
return HTMLResponse("""
<!DOCTYPE html>
<html>
<head>
<title>MCP System - All Agents Connected</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; }
.container { max-width: 900px; margin: 0 auto; background: rgba(255,255,255,0.1); padding: 30px; border-radius: 15px; }
h1 { text-align: center; margin-bottom: 20px; }
.agent { background: rgba(255,255,255,0.1); padding: 15px; margin: 10px 0; border-radius: 8px; }
.btn { background: #4CAF50; color: white; padding: 10px 20px; border: none; border-radius: 5px; text-decoration: none; display: inline-block; margin: 5px; }
.status { color: #4CAF50; font-weight: bold; }
</style>
</head>
<body>
<div class="container">
<h1>🤖 MCP System - All Agents Connected</h1>
<p style="text-align: center; font-size: 1.2em;" class="status">✅ System Operational</p>
<div class="agent">
<h3>🔢 Math Agent</h3>
<p><strong>Example:</strong> Calculate 20% of 500</p>
<p><strong>Capabilities:</strong> Mathematical calculations, percentages, formulas</p>
</div>
<div class="agent">
<h3>📄 Document Agent</h3>
<p><strong>Example:</strong> Analyze this text: Hello world</p>
<p><strong>Capabilities:</strong> Text analysis, document processing, summarization</p>
</div>
<div class="agent">
<h3>📧 Gmail Agent</h3>
<p><strong>Example:</strong> Send email to test@example.com</p>
<p><strong>Capabilities:</strong> Email automation, notifications, communication</p>
</div>
<div class="agent">
<h3>📅 Calendar Agent</h3>
<p><strong>Example:</strong> Create reminder for tomorrow</p>
<p><strong>Capabilities:</strong> Scheduling, reminders, time management</p>
</div>
<div class="agent">
<h3>🌤️ Weather Agent</h3>
<p><strong>Example:</strong> What is the weather in Mumbai?</p>
<p><strong>Capabilities:</strong> Live weather data, forecasts, climate information</p>
</div>
<div style="text-align: center; margin-top: 30px;">
<a href="/docs" class="btn">📚 API Documentation</a>
<a href="/api/health" class="btn">🔍 Health Check</a>
</div>
</div>
</body>
</html>
""")
@app.post("/api/mcp/command")
async def process_command(request: MCPCommandRequest):
"""Process commands."""
if not server_ready:
raise HTTPException(status_code=503, detail="Server not ready")
try:
command = request.command.lower().strip()
# Find matching agent
matching_agent = None
agent_id = None
for aid, agent_data in loaded_agents.items():
keywords = agent_data["config"]["keywords"]
if any(keyword in command for keyword in keywords):
matching_agent = agent_data["instance"]
agent_id = aid
break
if not matching_agent:
return {
"status": "success",
"message": f"Command processed but no specific agent matched",
"command": request.command,
"available_agents": list(loaded_agents.keys()),
"timestamp": datetime.now().isoformat()
}
# Create message for agent
from agents.base_agent import MCPMessage
message = MCPMessage(
id=f"{agent_id}_{datetime.now().timestamp()}",
method="process",
params={"query": request.command, "expression": request.command},
timestamp=datetime.now()
)
# Check if this requires multi-agent coordination
if inter_agent_hub and _requires_multi_agent(request.command):
logger.info(f"🔗 Coordinating multi-agent task: {request.command}")
coordination_result = await inter_agent_hub.coordinate_multi_agent_task(request.command, agent_id)
# Store coordination result - FORCE STORAGE
if mongodb_integration:
try:
# Primary storage
mongodb_id = await mongodb_integration.store_command_result(
command=request.command,
agent_used="multi_agent_coordination",
result=coordination_result,
timestamp=datetime.now()
)
coordination_result["stored_in_mongodb"] = True
coordination_result["mongodb_id"] = mongodb_id
logger.info(f"✅ Stored coordination result in MongoDB")
except Exception as e:
logger.error(f"❌ Primary coordination storage failed: {e}")
# Fallback storage
try:
fallback_success = await mongodb_integration.force_store_result(
"multi_agent_coordination", request.command, coordination_result
)
coordination_result["stored_in_mongodb"] = fallback_success
coordination_result["storage_method"] = "fallback"
logger.info(f"✅ Fallback coordination storage successful")
except Exception as e2:
logger.error(f"❌ Fallback coordination storage failed: {e2}")
coordination_result["stored_in_mongodb"] = False
coordination_result["storage_error"] = str(e2)
else:
coordination_result["stored_in_mongodb"] = False
coordination_result["storage_error"] = "MongoDB integration not available"
return coordination_result
# Process with single agent
result = await matching_agent.process_message(message)
# Add metadata
result["agent_used"] = agent_id
result["server"] = "embedded_mcp_server"
result["timestamp"] = datetime.now().isoformat()
# Store in MongoDB - FORCE STORAGE WITH GUARANTEED REPORTING
if mongodb_integration:
try:
# Primary storage method
mongodb_id = await mongodb_integration.store_command_result(
command=request.command,
agent_used=agent_id,
result=result,
timestamp=datetime.now()
)
result["stored_in_mongodb"] = True
result["mongodb_id"] = mongodb_id
result["storage_method"] = "primary"
logger.info(f"✅ Stored command result in MongoDB: {agent_id}")
except Exception as e:
logger.error(f"❌ Primary storage failed: {e}")
# Fallback storage method
try:
fallback_success = await mongodb_integration.force_store_result(
agent_id, request.command, result
)
result["stored_in_mongodb"] = fallback_success
result["storage_method"] = "fallback"
if fallback_success:
logger.info(f"✅ Fallback storage successful: {agent_id}")
else:
logger.error(f"❌ Fallback storage failed: {agent_id}")
except Exception as e2:
logger.error(f"❌ Fallback storage also failed: {e2}")
result["stored_in_mongodb"] = False
result["storage_error"] = str(e2)
result["storage_method"] = "failed"
else:
result["stored_in_mongodb"] = False
result["storage_error"] = "MongoDB integration not available"
result["storage_method"] = "unavailable"
return result
except Exception as e:
return {
"status": "error",
"message": f"Command processing failed: {str(e)}",
"timestamp": datetime.now().isoformat()
}
@app.get("/api/agents")
async def list_agents():
"""List agents."""
return {
"status": "success",
"agents": {
agent_id: {
"status": agent_data["status"],
"class_name": agent_data["config"]["class_name"],
"keywords": agent_data["config"]["keywords"]
}
for agent_id, agent_data in loaded_agents.items()
},
"total_agents": len(loaded_agents),
"inter_agent_communication": inter_agent_hub is not None,
"timestamp": datetime.now().isoformat()
}
@app.post("/api/mcp/coordinate")
async def coordinate_agents(request: MCPCommandRequest):
"""Coordinate multi-agent task."""
if not server_ready:
raise HTTPException(status_code=503, detail="Server not ready")
if not inter_agent_hub:
raise HTTPException(status_code=503, detail="Inter-agent communication not available")
try:
result = await inter_agent_hub.coordinate_multi_agent_task(request.command)
# Store coordination result
if mongodb_integration:
try:
await mongodb_integration.store_command_result(
command=request.command,
agent_used="multi_agent_coordination",
result=result,
timestamp=datetime.now()
)
result["stored_in_mongodb"] = True
except Exception as e:
logger.error(f"❌ Failed to store coordination result: {e}")
result["stored_in_mongodb"] = False
return result
except Exception as e:
return {
"status": "error",
"message": f"Multi-agent coordination failed: {str(e)}",
"timestamp": datetime.now().isoformat()
}
@app.get("/api/inter-agent/status")
async def inter_agent_status():
"""Get inter-agent communication status."""
if not inter_agent_hub:
return {
"status": "unavailable",
"message": "Inter-agent communication not initialized"
}
return inter_agent_hub.get_system_status()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")