Skip to main content
Glama

MCP Orchestration Server

connect_all_agents_mongodb.py19.4 kB
#!/usr/bin/env python3 """ Connect All Agents to MongoDB - Comprehensive Integration Script Ensures all agents are properly connected to MongoDB and storing data consistently """ import os import sys import asyncio import logging from datetime import datetime from pathlib import Path # Add project paths sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent / "blackhole_core" / "data_source")) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("mongodb_connector") class MongoDBAgentConnector: """Comprehensive MongoDB connector for all agents.""" def __init__(self): self.logger = logger self.mongodb_integration = None self.agents = {} self.connection_status = {} self.storage_stats = { "total_stored": 0, "successful_stores": 0, "failed_stores": 0, "agents_connected": 0 } async def initialize_mongodb(self): """Initialize MongoDB integration.""" try: from mcp_mongodb_integration import MCPMongoDBIntegration self.mongodb_integration = MCPMongoDBIntegration() connected = await self.mongodb_integration.connect() if connected: self.logger.info("✅ MongoDB integration initialized successfully") return True else: self.logger.error("❌ MongoDB connection failed") return False except ImportError as e: self.logger.error(f"❌ MongoDB integration not available: {e}") return False except Exception as e: self.logger.error(f"❌ MongoDB initialization error: {e}") return False async def load_all_agents(self): """Load all available agents.""" try: # Import agent manager from agent_manager import AgentManager agent_manager = AgentManager() await agent_manager.discover_agents() self.agents = agent_manager.loaded_agents self.logger.info(f"✅ Loaded {len(self.agents)} agents") for agent_id, agent_info in self.agents.items(): self.logger.info(f" 📋 {agent_id}: {agent_info.get('metadata', {}).get('name', 'Unknown')}") return True except Exception as e: self.logger.error(f"❌ Failed to load agents: {e}") return False async def check_agent_mongodb_connection(self, agent_id: str, agent_instance): """Check if an agent is properly connected to MongoDB.""" try: # Check if agent has MongoDB integration if hasattr(agent_instance, 'mongodb_integration') and agent_instance.mongodb_integration: # Test the connection if hasattr(agent_instance.mongodb_integration, 'collection') and agent_instance.mongodb_integration.collection: self.connection_status[agent_id] = "connected" self.storage_stats["agents_connected"] += 1 self.logger.info(f"✅ {agent_id}: MongoDB connected") return True else: self.connection_status[agent_id] = "disconnected" self.logger.warning(f"⚠️ {agent_id}: MongoDB integration exists but not connected") return False else: self.connection_status[agent_id] = "no_integration" self.logger.warning(f"⚠️ {agent_id}: No MongoDB integration found") return False except Exception as e: self.connection_status[agent_id] = f"error: {str(e)}" self.logger.error(f"❌ {agent_id}: Connection check failed - {e}") return False async def test_agent_storage(self, agent_id: str, agent_instance): """Test if an agent can store data in MongoDB.""" try: if not hasattr(agent_instance, 'mongodb_integration') or not agent_instance.mongodb_integration: return False # Create test data test_input = { "test": True, "agent_id": agent_id, "timestamp": datetime.now().isoformat(), "test_type": "mongodb_connection_test" } test_output = { "status": "success", "message": f"MongoDB connection test for {agent_id}", "test_result": "storage_test_passed", "timestamp": datetime.now().isoformat() } test_metadata = { "test_type": "mongodb_storage_test", "agent_version": getattr(agent_instance, 'version', '1.0.0'), "storage_method": "connection_test" } # Test primary storage method mongodb_id = await agent_instance.mongodb_integration.save_agent_output( agent_id, test_input, test_output, test_metadata ) if mongodb_id and "error" not in str(mongodb_id): self.storage_stats["successful_stores"] += 1 self.storage_stats["total_stored"] += 1 self.logger.info(f"✅ {agent_id}: Storage test passed - {mongodb_id}") return True else: self.storage_stats["failed_stores"] += 1 self.storage_stats["total_stored"] += 1 self.logger.error(f"❌ {agent_id}: Storage test failed - {mongodb_id}") return False except Exception as e: self.storage_stats["failed_stores"] += 1 self.storage_stats["total_stored"] += 1 self.logger.error(f"❌ {agent_id}: Storage test error - {e}") return False async def ensure_agent_mongodb_connection(self, agent_id: str, agent_instance): """Ensure an agent is properly connected to MongoDB.""" try: # Check if agent already has MongoDB integration if not hasattr(agent_instance, 'mongodb_integration') or not agent_instance.mongodb_integration: # Initialize MongoDB integration for the agent try: from mcp_mongodb_integration import MCPMongoDBIntegration agent_instance.mongodb_integration = MCPMongoDBIntegration() connected = await agent_instance.mongodb_integration.connect() if connected: self.logger.info(f"✅ {agent_id}: MongoDB integration initialized") else: self.logger.error(f"❌ {agent_id}: Failed to initialize MongoDB integration") return False except Exception as e: self.logger.error(f"❌ {agent_id}: MongoDB integration initialization error - {e}") return False # Test the connection connection_ok = await self.check_agent_mongodb_connection(agent_id, agent_instance) if not connection_ok: # Try to reconnect try: connected = await agent_instance.mongodb_integration.connect() if connected: self.logger.info(f"✅ {agent_id}: MongoDB reconnected successfully") connection_ok = True else: self.logger.error(f"❌ {agent_id}: MongoDB reconnection failed") return False except Exception as e: self.logger.error(f"❌ {agent_id}: MongoDB reconnection error - {e}") return False # Test storage capability storage_ok = await self.test_agent_storage(agent_id, agent_instance) return connection_ok and storage_ok except Exception as e: self.logger.error(f"❌ {agent_id}: Ensure connection error - {e}") return False async def connect_all_agents(self): """Connect all agents to MongoDB.""" self.logger.info("🔗 CONNECTING ALL AGENTS TO MONGODB") self.logger.info("=" * 60) # Initialize MongoDB mongodb_ok = await self.initialize_mongodb() if not mongodb_ok: self.logger.error("❌ Cannot proceed without MongoDB connection") return False # Load all agents agents_loaded = await self.load_all_agents() if not agents_loaded: self.logger.error("❌ Cannot proceed without agents") return False # Connect each agent to MongoDB success_count = 0 total_agents = len(self.agents) for agent_id, agent_info in self.agents.items(): self.logger.info(f"\n🔌 Connecting {agent_id} to MongoDB...") try: agent_instance = agent_info.get("instance") if agent_instance: success = await self.ensure_agent_mongodb_connection(agent_id, agent_instance) if success: success_count += 1 self.logger.info(f"✅ {agent_id}: Successfully connected to MongoDB") else: self.logger.error(f"❌ {agent_id}: Failed to connect to MongoDB") else: self.logger.error(f"❌ {agent_id}: No agent instance found") except Exception as e: self.logger.error(f"❌ {agent_id}: Connection error - {e}") # Generate summary report self.logger.info("\n" + "=" * 60) self.logger.info("📊 MONGODB CONNECTION SUMMARY") self.logger.info("=" * 60) self.logger.info(f"✅ Total Agents: {total_agents}") self.logger.info(f"✅ Successfully Connected: {success_count}") self.logger.info(f"❌ Failed Connections: {total_agents - success_count}") self.logger.info(f"📈 Success Rate: {(success_count/total_agents)*100:.1f}%") self.logger.info(f"\n📊 STORAGE TEST RESULTS:") self.logger.info(f"✅ Successful Stores: {self.storage_stats['successful_stores']}") self.logger.info(f"❌ Failed Stores: {self.storage_stats['failed_stores']}") self.logger.info(f"📊 Total Tests: {self.storage_stats['total_stored']}") if self.storage_stats['total_stored'] > 0: storage_success_rate = (self.storage_stats['successful_stores'] / self.storage_stats['total_stored']) * 100 self.logger.info(f"📈 Storage Success Rate: {storage_success_rate:.1f}%") # Show connection status for each agent self.logger.info(f"\n📋 AGENT CONNECTION STATUS:") for agent_id, status in self.connection_status.items(): status_icon = "✅" if status == "connected" else "❌" self.logger.info(f" {status_icon} {agent_id}: {status}") return success_count == total_agents async def verify_data_storage(self): """Verify that data is being stored properly by testing each agent.""" self.logger.info("\n🧪 VERIFYING DATA STORAGE") self.logger.info("=" * 60) test_queries = { "math_agent": "Calculate 10 + 5", "weather_agent": "Weather in Mumbai", "document_agent": "Analyze this text: Hello world, this is a test document for MongoDB storage verification." } verification_results = {} for agent_id, test_query in test_queries.items(): if agent_id in self.agents: self.logger.info(f"\n🧪 Testing {agent_id} with query: '{test_query}'") try: agent_instance = self.agents[agent_id]["instance"] # Create test message from agents.base_agent import MCPMessage message = MCPMessage( id=f"test_{agent_id}_{datetime.now().timestamp()}", method="process", params={"query": test_query, "expression": test_query}, timestamp=datetime.now() ) # Process the message result = await agent_instance.process_message(message) if result.get("status") == "success": verification_results[agent_id] = "✅ PASSED" self.logger.info(f"✅ {agent_id}: Test query processed successfully") else: verification_results[agent_id] = "❌ FAILED" self.logger.error(f"❌ {agent_id}: Test query failed") except Exception as e: verification_results[agent_id] = f"❌ ERROR: {str(e)}" self.logger.error(f"❌ {agent_id}: Test error - {e}") else: verification_results[agent_id] = "❌ NOT LOADED" self.logger.warning(f"⚠️ {agent_id}: Agent not loaded") # Show verification results self.logger.info(f"\n📊 VERIFICATION RESULTS:") for agent_id, result in verification_results.items(): self.logger.info(f" {result.split()[0]} {agent_id}: {result}") return verification_results async def main(): """Main function to connect all agents to MongoDB.""" print("🔗 MONGODB AGENT CONNECTOR") print("=" * 80) print("🎯 Connecting all agents to MongoDB for comprehensive data storage") print("💾 Ensuring consistent data persistence across all agents") print("🧪 Testing storage capabilities and data verification") print("=" * 80) connector = MongoDBAgentConnector() try: # Connect all agents to MongoDB success = await connector.connect_all_agents() if success: print("\n🎉 ALL AGENTS SUCCESSFULLY CONNECTED TO MONGODB!") # Verify data storage with test queries await connector.verify_data_storage() # Monitor storage health await connector.monitor_storage_health() # Get storage statistics await connector.get_storage_statistics() print("\n" + "=" * 80) print("✅ MONGODB INTEGRATION COMPLETE") print("=" * 80) print("🎯 All agents are now connected to MongoDB") print("💾 Data storage is working across all agents") print("🧪 Storage verification tests completed") print("💊 Health monitoring active") print("📊 Storage statistics available") print("🚀 Your system is ready for production use") else: print("\n⚠️ SOME AGENTS FAILED TO CONNECT TO MONGODB") print("🔧 Check the logs above for specific issues") print("💡 The system will still work, but some data may not be stored") except Exception as e: logger.error(f"❌ Main execution error: {e}") print(f"\n❌ MONGODB CONNECTION FAILED: {e}") print("🔧 Check your MongoDB configuration and try again") async def monitor_storage_health(self): """Monitor MongoDB storage health across all agents.""" self.logger.info("\n💊 MONGODB STORAGE HEALTH MONITORING") self.logger.info("=" * 60) health_results = {} for agent_id, agent_info in self.agents.items(): try: agent_instance = agent_info.get("instance") if agent_instance and hasattr(agent_instance, 'health_check'): health = await agent_instance.health_check() mongodb_status = health.get('mongodb_connected', False) health_results[agent_id] = { "mongodb_connected": mongodb_status, "status": health.get('status', 'unknown'), "failure_count": health.get('failure_count', 0), "last_check": health.get('last_check', 'unknown') } status_icon = "✅" if mongodb_status else "❌" self.logger.info(f" {status_icon} {agent_id}: MongoDB {mongodb_status}, Status: {health.get('status', 'unknown')}") else: health_results[agent_id] = {"error": "No health check available"} self.logger.warning(f" ⚠️ {agent_id}: No health check method available") except Exception as e: health_results[agent_id] = {"error": str(e)} self.logger.error(f" ❌ {agent_id}: Health check error - {e}") return health_results async def get_storage_statistics(self): """Get comprehensive storage statistics from MongoDB.""" if not self.mongodb_integration or not self.mongodb_integration.db: self.logger.warning("MongoDB not available for statistics") return {} try: self.logger.info("\n📊 MONGODB STORAGE STATISTICS") self.logger.info("=" * 60) db = self.mongodb_integration.db # Get collection statistics collections = await asyncio.to_thread(db.list_collection_names) stats = {} for collection_name in collections: collection = db[collection_name] count = await asyncio.to_thread(collection.count_documents, {}) stats[collection_name] = count self.logger.info(f" 📋 {collection_name}: {count} documents") # Get agent-specific statistics if 'agent_outputs' in collections: agent_outputs = db['agent_outputs'] # Count by agent pipeline = [ {"$group": {"_id": "$agent_id", "count": {"$sum": 1}}}, {"$sort": {"count": -1}} ] agent_counts = await asyncio.to_thread( lambda: list(agent_outputs.aggregate(pipeline)) ) self.logger.info(f"\n📊 DOCUMENTS BY AGENT:") for item in agent_counts: agent_id = item.get('_id', 'unknown') count = item.get('count', 0) self.logger.info(f" 🤖 {agent_id}: {count} documents") return stats except Exception as e: self.logger.error(f"❌ Error getting storage statistics: {e}") return {} if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Nisarg-123-web/MCP2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server