connect_all_fixed.py•19.6 kB
#!/usr/bin/env python3
"""
CONNECT ALL FIXED - Final Working Version
Connects all agents together with proper error handling
"""
import asyncio
import sys
import os
import importlib.util
import subprocess
import time
import requests
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Any, Optional
# Add project paths
sys.path.insert(0, str(Path(__file__).parent))
sys.path.insert(0, str(Path(__file__).parent / "agents"))
class FinalAgentConnector:
"""Final working agent connector with all fixes applied."""
def __init__(self):
self.agents = {}
self.failed_agents = {}
self.server_process = None
# Fixed agent configurations with correct class names
self.agent_configs = {
"math_agent": {
"path": "agents/specialized/math_agent.py",
"class_name": "MathAgent",
"priority": 1,
"test_command": "Calculate 20% of 500"
},
"document_agent": {
"path": "agents/core/document_processor.py",
"class_name": "DocumentProcessorAgent",
"priority": 1,
"test_command": "Analyze this text: Hello world"
},
"gmail_agent": {
"path": "agents/communication/real_gmail_agent.py",
"class_name": "RealGmailAgent",
"priority": 1,
"test_command": "Send email to test@example.com"
},
"calendar_agent": {
"path": "agents/specialized/calendar_agent.py",
"class_name": "CalendarAgent",
"priority": 1,
"test_command": "Create reminder for tomorrow"
},
"weather_agent": {
"path": "agents/data/realtime_weather_agent.py",
"class_name": "RealTimeWeatherAgent", # Fixed class name
"priority": 2,
"test_command": "What is the weather in Mumbai?"
}
}
def check_and_install_dependencies(self) -> bool:
"""Check and install missing dependencies."""
print("🔧 CHECKING DEPENDENCIES")
print("=" * 50)
required_packages = [
("requests", "requests"),
("fastapi", "fastapi"),
("uvicorn", "uvicorn"),
("pymongo", "pymongo"),
("python-dotenv", "dotenv"),
("langchain", "langchain")
]
missing = []
for package_name, import_name in required_packages:
try:
__import__(import_name)
print(f"✅ {package_name}")
except ImportError:
print(f"❌ {package_name}")
missing.append(package_name)
if missing:
print(f"\n🔧 Installing missing packages: {missing}")
for package in missing:
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
print(f"✅ Installed {package}")
except:
print(f"❌ Failed to install {package}")
return False
print("✅ All dependencies ready")
return True
async def load_agents_safely(self) -> Dict[str, Any]:
"""Load agents with comprehensive error handling."""
print("\n🤖 LOADING AGENTS SAFELY")
print("=" * 50)
loaded_count = 0
for agent_id, config in self.agent_configs.items():
print(f"🔄 Loading {agent_id}...")
try:
agent_path = Path(config["path"])
if not agent_path.exists():
print(f"❌ {agent_id}: File not found")
self.failed_agents[agent_id] = "File not found"
continue
# Dynamic import with error handling
spec = importlib.util.spec_from_file_location(agent_id, agent_path)
if spec is None or spec.loader is None:
print(f"❌ {agent_id}: Cannot load module")
self.failed_agents[agent_id] = "Cannot load module"
continue
module = importlib.util.module_from_spec(spec)
sys.modules[agent_id] = module
try:
spec.loader.exec_module(module)
except Exception as e:
print(f"❌ {agent_id}: Module execution failed - {e}")
self.failed_agents[agent_id] = f"Module execution failed: {e}"
continue
# Try to get the agent class
agent_class = getattr(module, config["class_name"], None)
if agent_class is None:
# Try alternative class names
alternative_names = [
config["class_name"].replace("Agent", ""),
f"{agent_id.title().replace('_', '')}Agent",
f"{agent_id.title().replace('_', '')}"
]
for alt_name in alternative_names:
agent_class = getattr(module, alt_name, None)
if agent_class:
print(f"✅ {agent_id}: Found class {alt_name}")
break
if agent_class is None:
available_classes = [name for name in dir(module) if not name.startswith('_')]
print(f"❌ {agent_id}: No suitable class found. Available: {available_classes}")
self.failed_agents[agent_id] = f"Class not found. Available: {available_classes}"
continue
# Create agent instance
try:
agent_instance = agent_class()
self.agents[agent_id] = {
"instance": agent_instance,
"config": config,
"status": "loaded",
"class_name": agent_class.__name__,
"loaded_at": datetime.now().isoformat()
}
print(f"✅ {agent_id}: Loaded successfully ({agent_class.__name__})")
loaded_count += 1
except Exception as e:
print(f"❌ {agent_id}: Instance creation failed - {e}")
self.failed_agents[agent_id] = f"Instance creation failed: {e}"
continue
except Exception as e:
print(f"❌ {agent_id}: Unexpected error - {e}")
self.failed_agents[agent_id] = f"Unexpected error: {e}"
continue
print(f"\n📊 Loaded {loaded_count} agents successfully")
if self.failed_agents:
print(f"🔒 Failed agents: {len(self.failed_agents)} (isolated)")
for agent_id, error in self.failed_agents.items():
print(f" • {agent_id}: {error}")
return self.agents
async def start_mcp_server_robust(self) -> bool:
"""Start MCP server with robust error handling."""
print("\n🚀 STARTING MCP SERVER")
print("=" * 50)
# Check if already running
try:
response = requests.get("http://localhost:8000/api/health", timeout=3)
if response.status_code == 200:
print("✅ MCP Server already running")
return True
except:
pass
# Try different server files (prioritize simple server)
server_files = [
"simple_mcp_server.py",
"mcp_server.py",
"core/mcp_server.py",
"start_mcp_server.py"
]
for server_file in server_files:
if not Path(server_file).exists():
continue
try:
print(f"🔄 Starting {server_file}...")
# Start server with proper environment
env = os.environ.copy()
env["PYTHONPATH"] = str(Path.cwd())
self.server_process = subprocess.Popen(
[sys.executable, server_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=env
)
# Wait for server to be ready with longer timeout
print("⏳ Waiting for server to start...")
for attempt in range(45): # 45 seconds timeout
try:
response = requests.get("http://localhost:8000/api/health", timeout=2)
if response.status_code == 200:
print("✅ MCP Server started successfully")
return True
except:
pass
await asyncio.sleep(1)
if attempt % 10 == 0:
print(f"⏳ Still waiting... ({attempt}/45)")
print("⚠️ Server started but not responding to health checks")
# Try to check if server is at least running
if self.server_process.poll() is None:
print("✅ Server process is running, continuing...")
return True
else:
print("❌ Server process died")
continue
except Exception as e:
print(f"❌ Failed to start {server_file}: {e}")
continue
print("❌ Could not start any MCP server")
return False
async def test_system_integration(self) -> Dict[str, Any]:
"""Test the complete system integration."""
print("\n🧪 TESTING SYSTEM INTEGRATION")
print("=" * 50)
test_results = {
"server_health": False,
"agent_tests": {},
"api_tests": {}
}
# Test server health
try:
response = requests.get("http://localhost:8000/api/health", timeout=5)
if response.status_code == 200:
test_results["server_health"] = True
health_data = response.json()
print(f"✅ Server health: {health_data.get('status', 'unknown')}")
else:
print(f"❌ Server health check failed: HTTP {response.status_code}")
except Exception as e:
print(f"❌ Server health check error: {e}")
# Test individual agents through API
for agent_id, agent_data in self.agents.items():
if "test_command" in agent_data["config"]:
command = agent_data["config"]["test_command"]
print(f"🔍 Testing {agent_id}: {command[:30]}...")
try:
response = requests.post(
"http://localhost:8000/api/mcp/command",
json={"command": command},
timeout=15
)
if response.status_code == 200:
result = response.json()
status = result.get("status", "unknown")
if status == "success":
test_results["agent_tests"][agent_id] = "✅ Working"
print(f"✅ {agent_id}: Working")
else:
test_results["agent_tests"][agent_id] = "⚠️ Limited"
print(f"⚠️ {agent_id}: Limited - {result.get('message', 'Unknown')}")
else:
test_results["agent_tests"][agent_id] = f"❌ HTTP {response.status_code}"
print(f"❌ {agent_id}: HTTP {response.status_code}")
except Exception as e:
test_results["agent_tests"][agent_id] = f"❌ Error"
print(f"❌ {agent_id}: {str(e)[:50]}...")
return test_results
async def connect_all_final(self) -> Dict[str, Any]:
"""Final connection method that brings everything together."""
print("🔗 FINAL AGENT CONNECTION SYSTEM")
print("=" * 80)
print("🎯 Connecting all agents with comprehensive error handling")
print("🛡️ Failed components are isolated automatically")
print("🚀 System continues operating with available components")
print("=" * 80)
results = {
"timestamp": datetime.now().isoformat(),
"dependencies_ready": False,
"agents_loaded": {},
"server_running": False,
"integration_tests": {},
"system_operational": False,
"working_agents": [],
"failed_agents": [],
"access_points": {}
}
try:
# Step 1: Dependencies
results["dependencies_ready"] = self.check_and_install_dependencies()
if not results["dependencies_ready"]:
return results
# Step 2: Load agents
results["agents_loaded"] = await self.load_agents_safely()
results["working_agents"] = list(results["agents_loaded"].keys())
results["failed_agents"] = list(self.failed_agents.keys())
# Step 3: Start server
results["server_running"] = await self.start_mcp_server_robust()
# Step 4: Test integration
if results["server_running"]:
results["integration_tests"] = await self.test_system_integration()
# Set access points
results["access_points"] = {
"web_interface": "http://localhost:8000",
"health_check": "http://localhost:8000/api/health",
"command_api": "http://localhost:8000/api/mcp/command",
"docs": "http://localhost:8000/docs"
}
# Determine system status
working_agents = len(results["working_agents"])
successful_tests = len([t for t in results["integration_tests"].get("agent_tests", {}).values() if "✅" in t])
results["system_operational"] = (
results["dependencies_ready"] and
results["server_running"] and
working_agents > 0 and
(successful_tests > 0 or results["integration_tests"].get("server_health", False))
)
return results
except Exception as e:
print(f"❌ Fatal error in connection process: {e}")
results["error"] = str(e)
return results
def cleanup(self):
"""Cleanup resources."""
if self.server_process:
try:
self.server_process.terminate()
self.server_process.wait(timeout=5)
except:
try:
self.server_process.kill()
except:
pass
async def main():
"""Main function."""
connector = FinalAgentConnector()
try:
results = await connector.connect_all_final()
# Display comprehensive results
print("\n" + "=" * 80)
print("📊 FINAL CONNECTION RESULTS")
print("=" * 80)
print(f"🔧 Dependencies: {'✅ Ready' if results['dependencies_ready'] else '❌ Missing'}")
print(f"🤖 Working Agents: {len(results['working_agents'])}")
print(f"🔒 Failed Agents: {len(results['failed_agents'])}")
print(f"🚀 MCP Server: {'✅ Running' if results['server_running'] else '❌ Failed'}")
print(f"⚡ System: {'✅ OPERATIONAL' if results['system_operational'] else '❌ Limited'}")
if results["working_agents"]:
print(f"\n✅ WORKING AGENTS ({len(results['working_agents'])}):")
for agent in results["working_agents"]:
agent_data = results["agents_loaded"][agent]
class_name = agent_data.get("class_name", "Unknown")
print(f" • {agent} ({class_name})")
if results["failed_agents"]:
print(f"\n🔒 ISOLATED AGENTS ({len(results['failed_agents'])}):")
for agent in results["failed_agents"]:
error = connector.failed_agents.get(agent, "Unknown error")
print(f" • {agent}: {error[:60]}...")
if results["integration_tests"].get("agent_tests"):
print(f"\n🧪 INTEGRATION TESTS:")
for agent, status in results["integration_tests"]["agent_tests"].items():
print(f" • {agent}: {status}")
if results["system_operational"]:
print("\n🎉 SYSTEM FULLY OPERATIONAL!")
print("✅ All working agents connected and tested")
print("🛡️ Failed agents isolated - system continues smoothly")
print("\n🌐 ACCESS POINTS:")
for name, url in results["access_points"].items():
print(f" • {name.replace('_', ' ').title()}: {url}")
print("\n🧪 TEST COMMANDS:")
print(" # Test math agent")
print(" curl -X POST http://localhost:8000/api/mcp/command \\")
print(" -H 'Content-Type: application/json' \\")
print(" -d '{\"command\": \"Calculate 20% of 500\"}'")
print("\n # Test document agent")
print(" curl -X POST http://localhost:8000/api/mcp/command \\")
print(" -H 'Content-Type: application/json' \\")
print(" -d '{\"command\": \"Analyze this text: Hello world\"}'")
return True
else:
print("\n⚠️ PARTIAL OPERATION")
print("Some components working but system needs attention")
return False
except KeyboardInterrupt:
print("\n👋 Connection cancelled by user")
return False
except Exception as e:
print(f"\n❌ Fatal error: {e}")
import traceback
traceback.print_exc()
return False
finally:
connector.cleanup()
if __name__ == "__main__":
try:
success = asyncio.run(main())
if success:
print("\n🎉 ALL AGENTS CONNECTED SUCCESSFULLY!")
print("🔗 Your unified MCP system is fully operational!")
else:
print("\n🔧 Connection completed with some issues.")
print("💡 Check the messages above for details.")
except Exception as e:
print(f"\n❌ Connection failed: {e}")
import traceback
traceback.print_exc()