Skip to main content
Glama

MCP Orchestration Server

connect_all_fixed.py19.6 kB
#!/usr/bin/env python3 """ CONNECT ALL FIXED - Final Working Version Connects all agents together with proper error handling """ import asyncio import sys import os import importlib.util import subprocess import time import requests import json from pathlib import Path from datetime import datetime from typing import Dict, List, Any, Optional # Add project paths sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent / "agents")) class FinalAgentConnector: """Final working agent connector with all fixes applied.""" def __init__(self): self.agents = {} self.failed_agents = {} self.server_process = None # Fixed agent configurations with correct class names self.agent_configs = { "math_agent": { "path": "agents/specialized/math_agent.py", "class_name": "MathAgent", "priority": 1, "test_command": "Calculate 20% of 500" }, "document_agent": { "path": "agents/core/document_processor.py", "class_name": "DocumentProcessorAgent", "priority": 1, "test_command": "Analyze this text: Hello world" }, "gmail_agent": { "path": "agents/communication/real_gmail_agent.py", "class_name": "RealGmailAgent", "priority": 1, "test_command": "Send email to test@example.com" }, "calendar_agent": { "path": "agents/specialized/calendar_agent.py", "class_name": "CalendarAgent", "priority": 1, "test_command": "Create reminder for tomorrow" }, "weather_agent": { "path": "agents/data/realtime_weather_agent.py", "class_name": "RealTimeWeatherAgent", # Fixed class name "priority": 2, "test_command": "What is the weather in Mumbai?" } } def check_and_install_dependencies(self) -> bool: """Check and install missing dependencies.""" print("🔧 CHECKING DEPENDENCIES") print("=" * 50) required_packages = [ ("requests", "requests"), ("fastapi", "fastapi"), ("uvicorn", "uvicorn"), ("pymongo", "pymongo"), ("python-dotenv", "dotenv"), ("langchain", "langchain") ] missing = [] for package_name, import_name in required_packages: try: __import__(import_name) print(f"✅ {package_name}") except ImportError: print(f"❌ {package_name}") missing.append(package_name) if missing: print(f"\n🔧 Installing missing packages: {missing}") for package in missing: try: subprocess.check_call([sys.executable, "-m", "pip", "install", package]) print(f"✅ Installed {package}") except: print(f"❌ Failed to install {package}") return False print("✅ All dependencies ready") return True async def load_agents_safely(self) -> Dict[str, Any]: """Load agents with comprehensive error handling.""" print("\n🤖 LOADING AGENTS SAFELY") print("=" * 50) loaded_count = 0 for agent_id, config in self.agent_configs.items(): print(f"🔄 Loading {agent_id}...") try: agent_path = Path(config["path"]) if not agent_path.exists(): print(f"❌ {agent_id}: File not found") self.failed_agents[agent_id] = "File not found" continue # Dynamic import with error handling spec = importlib.util.spec_from_file_location(agent_id, agent_path) if spec is None or spec.loader is None: print(f"❌ {agent_id}: Cannot load module") self.failed_agents[agent_id] = "Cannot load module" continue module = importlib.util.module_from_spec(spec) sys.modules[agent_id] = module try: spec.loader.exec_module(module) except Exception as e: print(f"❌ {agent_id}: Module execution failed - {e}") self.failed_agents[agent_id] = f"Module execution failed: {e}" continue # Try to get the agent class agent_class = getattr(module, config["class_name"], None) if agent_class is None: # Try alternative class names alternative_names = [ config["class_name"].replace("Agent", ""), f"{agent_id.title().replace('_', '')}Agent", f"{agent_id.title().replace('_', '')}" ] for alt_name in alternative_names: agent_class = getattr(module, alt_name, None) if agent_class: print(f"✅ {agent_id}: Found class {alt_name}") break if agent_class is None: available_classes = [name for name in dir(module) if not name.startswith('_')] print(f"❌ {agent_id}: No suitable class found. Available: {available_classes}") self.failed_agents[agent_id] = f"Class not found. Available: {available_classes}" continue # Create agent instance try: agent_instance = agent_class() self.agents[agent_id] = { "instance": agent_instance, "config": config, "status": "loaded", "class_name": agent_class.__name__, "loaded_at": datetime.now().isoformat() } print(f"✅ {agent_id}: Loaded successfully ({agent_class.__name__})") loaded_count += 1 except Exception as e: print(f"❌ {agent_id}: Instance creation failed - {e}") self.failed_agents[agent_id] = f"Instance creation failed: {e}" continue except Exception as e: print(f"❌ {agent_id}: Unexpected error - {e}") self.failed_agents[agent_id] = f"Unexpected error: {e}" continue print(f"\n📊 Loaded {loaded_count} agents successfully") if self.failed_agents: print(f"🔒 Failed agents: {len(self.failed_agents)} (isolated)") for agent_id, error in self.failed_agents.items(): print(f" • {agent_id}: {error}") return self.agents async def start_mcp_server_robust(self) -> bool: """Start MCP server with robust error handling.""" print("\n🚀 STARTING MCP SERVER") print("=" * 50) # Check if already running try: response = requests.get("http://localhost:8000/api/health", timeout=3) if response.status_code == 200: print("✅ MCP Server already running") return True except: pass # Try different server files (prioritize simple server) server_files = [ "simple_mcp_server.py", "mcp_server.py", "core/mcp_server.py", "start_mcp_server.py" ] for server_file in server_files: if not Path(server_file).exists(): continue try: print(f"🔄 Starting {server_file}...") # Start server with proper environment env = os.environ.copy() env["PYTHONPATH"] = str(Path.cwd()) self.server_process = subprocess.Popen( [sys.executable, server_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env ) # Wait for server to be ready with longer timeout print("⏳ Waiting for server to start...") for attempt in range(45): # 45 seconds timeout try: response = requests.get("http://localhost:8000/api/health", timeout=2) if response.status_code == 200: print("✅ MCP Server started successfully") return True except: pass await asyncio.sleep(1) if attempt % 10 == 0: print(f"⏳ Still waiting... ({attempt}/45)") print("⚠️ Server started but not responding to health checks") # Try to check if server is at least running if self.server_process.poll() is None: print("✅ Server process is running, continuing...") return True else: print("❌ Server process died") continue except Exception as e: print(f"❌ Failed to start {server_file}: {e}") continue print("❌ Could not start any MCP server") return False async def test_system_integration(self) -> Dict[str, Any]: """Test the complete system integration.""" print("\n🧪 TESTING SYSTEM INTEGRATION") print("=" * 50) test_results = { "server_health": False, "agent_tests": {}, "api_tests": {} } # Test server health try: response = requests.get("http://localhost:8000/api/health", timeout=5) if response.status_code == 200: test_results["server_health"] = True health_data = response.json() print(f"✅ Server health: {health_data.get('status', 'unknown')}") else: print(f"❌ Server health check failed: HTTP {response.status_code}") except Exception as e: print(f"❌ Server health check error: {e}") # Test individual agents through API for agent_id, agent_data in self.agents.items(): if "test_command" in agent_data["config"]: command = agent_data["config"]["test_command"] print(f"🔍 Testing {agent_id}: {command[:30]}...") try: response = requests.post( "http://localhost:8000/api/mcp/command", json={"command": command}, timeout=15 ) if response.status_code == 200: result = response.json() status = result.get("status", "unknown") if status == "success": test_results["agent_tests"][agent_id] = "✅ Working" print(f"✅ {agent_id}: Working") else: test_results["agent_tests"][agent_id] = "⚠️ Limited" print(f"⚠️ {agent_id}: Limited - {result.get('message', 'Unknown')}") else: test_results["agent_tests"][agent_id] = f"❌ HTTP {response.status_code}" print(f"❌ {agent_id}: HTTP {response.status_code}") except Exception as e: test_results["agent_tests"][agent_id] = f"❌ Error" print(f"❌ {agent_id}: {str(e)[:50]}...") return test_results async def connect_all_final(self) -> Dict[str, Any]: """Final connection method that brings everything together.""" print("🔗 FINAL AGENT CONNECTION SYSTEM") print("=" * 80) print("🎯 Connecting all agents with comprehensive error handling") print("🛡️ Failed components are isolated automatically") print("🚀 System continues operating with available components") print("=" * 80) results = { "timestamp": datetime.now().isoformat(), "dependencies_ready": False, "agents_loaded": {}, "server_running": False, "integration_tests": {}, "system_operational": False, "working_agents": [], "failed_agents": [], "access_points": {} } try: # Step 1: Dependencies results["dependencies_ready"] = self.check_and_install_dependencies() if not results["dependencies_ready"]: return results # Step 2: Load agents results["agents_loaded"] = await self.load_agents_safely() results["working_agents"] = list(results["agents_loaded"].keys()) results["failed_agents"] = list(self.failed_agents.keys()) # Step 3: Start server results["server_running"] = await self.start_mcp_server_robust() # Step 4: Test integration if results["server_running"]: results["integration_tests"] = await self.test_system_integration() # Set access points results["access_points"] = { "web_interface": "http://localhost:8000", "health_check": "http://localhost:8000/api/health", "command_api": "http://localhost:8000/api/mcp/command", "docs": "http://localhost:8000/docs" } # Determine system status working_agents = len(results["working_agents"]) successful_tests = len([t for t in results["integration_tests"].get("agent_tests", {}).values() if "✅" in t]) results["system_operational"] = ( results["dependencies_ready"] and results["server_running"] and working_agents > 0 and (successful_tests > 0 or results["integration_tests"].get("server_health", False)) ) return results except Exception as e: print(f"❌ Fatal error in connection process: {e}") results["error"] = str(e) return results def cleanup(self): """Cleanup resources.""" if self.server_process: try: self.server_process.terminate() self.server_process.wait(timeout=5) except: try: self.server_process.kill() except: pass async def main(): """Main function.""" connector = FinalAgentConnector() try: results = await connector.connect_all_final() # Display comprehensive results print("\n" + "=" * 80) print("📊 FINAL CONNECTION RESULTS") print("=" * 80) print(f"🔧 Dependencies: {'✅ Ready' if results['dependencies_ready'] else '❌ Missing'}") print(f"🤖 Working Agents: {len(results['working_agents'])}") print(f"🔒 Failed Agents: {len(results['failed_agents'])}") print(f"🚀 MCP Server: {'✅ Running' if results['server_running'] else '❌ Failed'}") print(f"⚡ System: {'✅ OPERATIONAL' if results['system_operational'] else '❌ Limited'}") if results["working_agents"]: print(f"\n✅ WORKING AGENTS ({len(results['working_agents'])}):") for agent in results["working_agents"]: agent_data = results["agents_loaded"][agent] class_name = agent_data.get("class_name", "Unknown") print(f" • {agent} ({class_name})") if results["failed_agents"]: print(f"\n🔒 ISOLATED AGENTS ({len(results['failed_agents'])}):") for agent in results["failed_agents"]: error = connector.failed_agents.get(agent, "Unknown error") print(f" • {agent}: {error[:60]}...") if results["integration_tests"].get("agent_tests"): print(f"\n🧪 INTEGRATION TESTS:") for agent, status in results["integration_tests"]["agent_tests"].items(): print(f" • {agent}: {status}") if results["system_operational"]: print("\n🎉 SYSTEM FULLY OPERATIONAL!") print("✅ All working agents connected and tested") print("🛡️ Failed agents isolated - system continues smoothly") print("\n🌐 ACCESS POINTS:") for name, url in results["access_points"].items(): print(f" • {name.replace('_', ' ').title()}: {url}") print("\n🧪 TEST COMMANDS:") print(" # Test math agent") print(" curl -X POST http://localhost:8000/api/mcp/command \\") print(" -H 'Content-Type: application/json' \\") print(" -d '{\"command\": \"Calculate 20% of 500\"}'") print("\n # Test document agent") print(" curl -X POST http://localhost:8000/api/mcp/command \\") print(" -H 'Content-Type: application/json' \\") print(" -d '{\"command\": \"Analyze this text: Hello world\"}'") return True else: print("\n⚠️ PARTIAL OPERATION") print("Some components working but system needs attention") return False except KeyboardInterrupt: print("\n👋 Connection cancelled by user") return False except Exception as e: print(f"\n❌ Fatal error: {e}") import traceback traceback.print_exc() return False finally: connector.cleanup() if __name__ == "__main__": try: success = asyncio.run(main()) if success: print("\n🎉 ALL AGENTS CONNECTED SUCCESSFULLY!") print("🔗 Your unified MCP system is fully operational!") else: print("\n🔧 Connection completed with some issues.") print("💡 Check the messages above for details.") except Exception as e: print(f"\n❌ Connection failed: {e}") import traceback traceback.print_exc()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Nisarg-123-web/MCP2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server