Skip to main content
Glama

MCP Orchestration Server

mcp_mongodb_integration.py24.9 kB
# #!/usr/bin/env python3 # """ # MCP-MongoDB Integration Layer # Connects MCP agents with MongoDB storage # """ # import os # import asyncio # import logging # from datetime import datetime # from typing import Dict, List, Any, Optional # from dotenv import load_dotenv # # Load environment variables # load_dotenv() # try: # import pymongo # from pymongo import MongoClient # PYMONGO_AVAILABLE = True # except ImportError: # PYMONGO_AVAILABLE = False # class MCPMongoDBIntegration: # """Integration layer between MCP agents and MongoDB.""" # def __init__(self): # self.mongo_uri = os.getenv('MONGO_URI') or os.getenv('MONGODB_URI') # self.db_name = os.getenv('MONGO_DB_NAME', 'blackhole_db') # self.collection_name = os.getenv('MONGO_COLLECTION_NAME', 'agent_outputs') # self.client = None # self.db = None # self.collection = None # self.logger = self._setup_logging() # def _setup_logging(self) -> logging.Logger: # """Setup logging.""" # logger = logging.getLogger("mcp_mongodb_integration") # logger.setLevel(logging.INFO) # if not logger.handlers: # handler = logging.StreamHandler() # formatter = logging.Formatter( # '%(asctime)s - %(name)s - %(levelname)s - %(message)s' # ) # handler.setFormatter(formatter) # logger.addHandler(handler) # return logger # async def connect(self) -> bool: # """Connect to MongoDB.""" # if not PYMONGO_AVAILABLE: # self.logger.error("PyMongo not available") # return False # if not self.mongo_uri: # self.logger.error("MongoDB URI not configured") # return False # try: # self.client = MongoClient(self.mongo_uri, serverSelectionTimeoutMS=5000) # # Test connection # self.client.admin.command('ping') # # Get database and collection # self.db = self.client[self.db_name] # self.collection = self.db[self.collection_name] # self.logger.info("Connected to MongoDB successfully") # return True # except Exception as e: # self.logger.error(f"Failed to connect to MongoDB: {e}") # return False # async def save_agent_output(self, agent_id: str, input_data: Dict[str, Any], # output_data: Dict[str, Any], metadata: Dict[str, Any] = None) -> str: # """Save agent output to MongoDB.""" # if self.collection is None: # self.logger.warning("MongoDB not connected, cannot save agent output") # return f"mock_{datetime.now().timestamp()}" # try: # document = { # "agent": agent_id, # "agent_id": agent_id, # "input": input_data, # "output": output_data, # "metadata": metadata or {}, # "timestamp": datetime.now(), # "created_at": datetime.now() # } # result = self.collection.insert_one(document) # self.logger.info(f"Saved {agent_id} output to MongoDB: {result.inserted_id}") # return str(result.inserted_id) # except Exception as e: # self.logger.error(f"Error saving agent output: {e}") # return f"error_{datetime.now().timestamp()}" # async def store_command_result(self, command: str, agent_used: str, result: Dict[str, Any], timestamp: datetime) -> str: # """Store MCP command result in MongoDB.""" # if not self.db: # self.logger.warning("MongoDB not connected, cannot store command result") # return f"mock_{timestamp.timestamp()}" # try: # # Store in mcp_commands collection # commands_collection = self.db['mcp_commands'] # document = { # "command": command, # "agent_used": agent_used, # "result": result, # "timestamp": timestamp, # "created_at": datetime.now(), # "server": "embedded_mcp_server", # "storage_type": "command_result" # } # result_doc = commands_collection.insert_one(document) # # Also store in agent_outputs collection for compatibility # await self.save_agent_output( # agent_used, # {"command": command, "query": command, "type": "mcp_command"}, # result, # { # "command_timestamp": timestamp.isoformat(), # "server": "embedded_mcp_server", # "storage_type": "agent_output", # "mongodb_id": str(result_doc.inserted_id) # } # ) # self.logger.info(f"✅ Stored command result in MongoDB: {result_doc.inserted_id}") # return str(result_doc.inserted_id) # except Exception as e: # self.logger.error(f"❌ Error storing command result: {e}") # return f"error_{timestamp.timestamp()}" # async def force_store_result(self, agent_id: str, command: str, result: Dict[str, Any]) -> bool: # """Force store any result in MongoDB with multiple fallback methods.""" # if not self.db: # self.logger.warning("MongoDB not connected") # return False # try: # timestamp = datetime.now() # # Method 1: Store in mcp_commands # try: # commands_collection = self.db['mcp_commands'] # commands_collection.insert_one({ # "command": command, # "agent_used": agent_id, # "result": result, # "timestamp": timestamp, # "created_at": timestamp, # "server": "embedded_mcp_server", # "storage_method": "force_store" # }) # self.logger.info(f"✅ Force stored in mcp_commands: {agent_id}") # except Exception as e: # self.logger.error(f"Failed to store in mcp_commands: {e}") # # Method 2: Store in agent_outputs # try: # await self.save_agent_output( # agent_id, # {"command": command, "query": command, "type": "force_store"}, # result, # {"timestamp": timestamp.isoformat(), "storage_method": "force_store"} # ) # self.logger.info(f"✅ Force stored in agent_outputs: {agent_id}") # except Exception as e: # self.logger.error(f"Failed to store in agent_outputs: {e}") # # Method 3: Store in general results collection # try: # results_collection = self.db['all_results'] # results_collection.insert_one({ # "agent_id": agent_id, # "command": command, # "result": result, # "timestamp": timestamp, # "created_at": timestamp, # "storage_method": "force_store_fallback" # }) # self.logger.info(f"✅ Force stored in all_results: {agent_id}") # except Exception as e: # self.logger.error(f"Failed to store in all_results: {e}") # return True # except Exception as e: # self.logger.error(f"❌ Complete force store failure: {e}") # return False # async def process_document_with_agent(self, filename: str, content: str, # query: str = "analyze this document") -> Dict[str, Any]: # """Process document using document_processor agent and save to MongoDB.""" # try: # # Simulate document processing (since we don't have direct agent access yet) # input_data = { # "file": { # "name": filename, # "content": content[:500] + "..." if len(content) > 500 else content # }, # "query": query, # "processing_type": "document_analysis" # } # # Simulate document processor output # output_data = { # "extracted_text": content, # "word_count": len(content.split()) if content else 0, # "character_count": len(content), # "analysis": f"Document analysis for: {filename}", # "query_response": f"Processed query: {query}", # "detected_authors": self._extract_authors(content), # "summary": self._generate_summary(content), # "processing_status": "completed" # } # metadata = { # "filename": filename, # "file_size": len(content), # "processing_time": 0.1, # "agent_version": "1.0.0", # "processing_method": "mcp_integration" # } # # Save to MongoDB # mongodb_id = await self.save_agent_output( # "document_processor", # input_data, # output_data, # metadata # ) # return { # "status": "success", # "agent": "document_processor", # "mongodb_id": mongodb_id, # "output": output_data, # "metadata": metadata # } # except Exception as e: # self.logger.error(f"Error processing document: {e}") # return { # "status": "error", # "error": str(e), # "agent": "document_processor" # } # def _extract_authors(self, content: str) -> List[str]: # """Extract authors from content.""" # import re # authors = [] # # Look for author patterns # patterns = [ # r"Author[s]?:\s*([^\n]+)", # r"By:\s*([^\n]+)", # r"Written by:\s*([^\n]+)" # ] # for pattern in patterns: # matches = re.findall(pattern, content, re.IGNORECASE) # for match in matches: # author_names = [name.strip() for name in match.split(",")] # authors.extend(author_names) # return list(set([author for author in authors if author])) # def _generate_summary(self, content: str) -> str: # """Generate a simple summary.""" # if not content: # return "No content to summarize" # words = content.split() # if len(words) <= 50: # return content # # Simple summary: first 50 words # summary = " ".join(words[:50]) + "..." # return summary # async def process_pdf_document(self, filename: str, content: str) -> Dict[str, Any]: # """Process PDF document.""" # return await self.process_document_with_agent(filename, content, "extract and analyze PDF content") # async def process_ocr_image(self, filename: str, image_data: str) -> Dict[str, Any]: # """Process OCR image.""" # # Simulate OCR processing # input_data = { # "file": { # "name": filename, # "type": "image" # }, # "processing_type": "ocr_analysis" # } # output_data = { # "extracted_text": f"OCR extracted text from {filename}", # "confidence_score": 0.95, # "detected_language": "en", # "text_regions": 1, # "processing_status": "completed" # } # metadata = { # "filename": filename, # "image_format": "unknown", # "ocr_engine": "tesseract", # "processing_time": 0.2 # } # mongodb_id = await self.save_agent_output("ocr_processor", input_data, output_data, metadata) # return { # "status": "success", # "agent": "ocr_processor", # "mongodb_id": mongodb_id, # "output": output_data, # "metadata": metadata # } # async def get_agent_statistics(self) -> Dict[str, Any]: # """Get statistics about agent usage.""" # if not self.collection: # return {"error": "MongoDB not connected"} # try: # # Count documents by agent # pipeline = [ # {"$group": {"_id": "$agent", "count": {"$sum": 1}}}, # {"$sort": {"count": -1}} # ] # agent_counts = list(self.collection.aggregate(pipeline)) # # Total counts # total_docs = self.collection.count_documents({}) # doc_agents = self.collection.count_documents({"agent": {"$regex": "document", "$options": "i"}}) # pdf_agents = self.collection.count_documents({"agent": {"$regex": "pdf", "$options": "i"}}) # ocr_agents = self.collection.count_documents({"agent": {"$regex": "ocr", "$options": "i"}}) # return { # "total_documents": total_docs, # "agent_counts": {item["_id"]: item["count"] for item in agent_counts}, # "document_agents": doc_agents, # "pdf_agents": pdf_agents, # "ocr_agents": ocr_agents, # "last_updated": datetime.now().isoformat() # } # except Exception as e: # self.logger.error(f"Error getting statistics: {e}") # return {"error": str(e)} # def close(self): # """Close MongoDB connection.""" # if self.client: # self.client.close() # # Test function # async def test_integration(): # """Test the MCP-MongoDB integration.""" # print("🧪 Testing MCP-MongoDB Integration") # print("=" * 50) # integration = MCPMongoDBIntegration() # try: # # Connect # print("1️⃣ Connecting to MongoDB...") # if await integration.connect(): # print(" ✅ Connected successfully!") # else: # print(" ❌ Connection failed!") # return # # Test document processing # print("\n2️⃣ Testing document processing...") # test_content = """ # Title: Test Document # Author: John Doe # This is a test document for the MCP system. # It contains sample content to test document processing capabilities. # The document processor should extract authors and generate summaries. # """ # result = await integration.process_document_with_agent( # "test_document.txt", # test_content, # "who is the author and what is the summary" # ) # if result["status"] == "success": # print(f" ✅ Document processed! MongoDB ID: {result['mongodb_id']}") # print(f" 📄 Authors found: {result['output']['detected_authors']}") # print(f" 📝 Word count: {result['output']['word_count']}") # else: # print(f" ❌ Processing failed: {result.get('error', 'unknown error')}") # # Test PDF processing # print("\n3️⃣ Testing PDF processing...") # pdf_result = await integration.process_pdf_document( # "sample.pdf", # "This is sample PDF content for testing." # ) # if pdf_result["status"] == "success": # print(f" ✅ PDF processed! MongoDB ID: {pdf_result['mongodb_id']}") # # Test OCR processing # print("\n4️⃣ Testing OCR processing...") # ocr_result = await integration.process_ocr_image( # "sample_image.jpg", # "image_data_here" # ) # if ocr_result["status"] == "success": # print(f" ✅ OCR processed! MongoDB ID: {ocr_result['mongodb_id']}") # # Get updated statistics # print("\n5️⃣ Getting updated statistics...") # stats = await integration.get_agent_statistics() # if "error" not in stats: # print(f" 📊 Total documents: {stats['total_documents']}") # print(f" 📄 Document agents: {stats['document_agents']}") # print(f" 📄 PDF agents: {stats['pdf_agents']}") # print(f" 🖼️ OCR agents: {stats['ocr_agents']}") # print(f" 🤖 Agent breakdown: {stats['agent_counts']}") # print("\n✅ Integration test completed successfully!") # except Exception as e: # print(f"❌ Test failed: {e}") # finally: # integration.close() # if __name__ == "__main__": # asyncio.run(test_integration()) #!/usr/bin/env python3 """ MCP-MongoDB Integration Layer Connects MCP agents with MongoDB storage """ import os import asyncio import logging from datetime import datetime from typing import Dict, Any from dotenv import load_dotenv import unicodedata # Load environment variables load_dotenv() # Sanitize text for MongoDB storage def sanitize(data): if isinstance(data, str): return unicodedata.normalize("NFKD", data).encode("utf-8", "ignore").decode("utf-8") elif isinstance(data, dict): return {k: sanitize(v) for k, v in data.items()} elif isinstance(data, list): return [sanitize(i) for i in data] return data try: import pymongo from pymongo import MongoClient PYMONGO_AVAILABLE = True except ImportError: PYMONGO_AVAILABLE = False class MCPMongoDBIntegration: """Integration layer between MCP agents and MongoDB.""" def __init__(self): self.mongo_uri = os.getenv('MONGO_URI') or os.getenv('MONGODB_URI') self.db_name = os.getenv('MONGO_DB_NAME', 'blackhole_db') self.collection_name = os.getenv('MONGO_COLLECTION_NAME', 'agent_outputs') self.client = None self.db = None self.collection = None self.logger = self._setup_logging() def _setup_logging(self) -> logging.Logger: logger = logging.getLogger("mcp_mongodb_integration") logger.setLevel(logging.INFO) if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) return logger async def connect(self) -> bool: if not PYMONGO_AVAILABLE: self.logger.error("PyMongo not available") return False if not self.mongo_uri: self.logger.error("MongoDB URI not configured") return False try: self.client = MongoClient(self.mongo_uri, serverSelectionTimeoutMS=5000) self.client.admin.command('ping') self.db = self.client[self.db_name] self.collection = self.db[self.collection_name] self.logger.info("Connected to MongoDB successfully") return True except Exception as e: self.logger.error(f"Failed to connect to MongoDB: {e}") return False async def save_agent_output(self, agent_id: str, input_data: Dict[str, Any], output_data: Dict[str, Any], metadata: Dict[str, Any] = None) -> str: if self.collection is None: self.logger.warning("MongoDB not connected, cannot save agent output") return f"mock_{datetime.now().timestamp()}" try: input_data = sanitize(input_data) output_data = sanitize(output_data) document = { "agent": agent_id, "agent_id": agent_id, "input": input_data, "output": output_data, "metadata": metadata or {}, "timestamp": datetime.now(), "created_at": datetime.now() } try: print("📦 MongoDB insert payload:", document) except Exception as e: print(f"[Debug Print Failed]: {e}") result = self.collection.insert_one(document) self.logger.info(f"Saved {agent_id} output to MongoDB: {result.inserted_id}") return str(result.inserted_id) except Exception as e: self.logger.error(f"Error saving agent output: {e}") return f"error_{datetime.now().timestamp()}" async def store_command_result(self, command: str, agent_used: str, result: Dict[str, Any], timestamp: datetime) -> str: if self.db is None: self.logger.warning("MongoDB not connected, cannot store command result") return f"mock_{timestamp.timestamp()}" try: commands_collection = self.db['mcp_commands'] document = { "command": command, "agent_used": agent_used, "result": result, "timestamp": timestamp, "created_at": datetime.now(), "server": "embedded_mcp_server", "storage_type": "command_result" } print("📦 store_command_result document:", document) result_doc = commands_collection.insert_one(document) await self.save_agent_output( agent_used, {"command": command, "query": command, "type": "mcp_command"}, result, { "command_timestamp": timestamp.isoformat(), "server": "embedded_mcp_server", "storage_type": "agent_output", "mongodb_id": str(result_doc.inserted_id) } ) self.logger.info(f"✅ Stored command result in MongoDB: {result_doc.inserted_id}") return str(result_doc.inserted_id) except Exception as e: self.logger.error(f"❌ Error storing command result: {e}") return f"error_{timestamp.timestamp()}" async def force_store_result(self, agent_id: str, command: str, result: Dict[str, Any]) -> bool: if self.db is None: self.logger.warning("MongoDB not connected") return False try: timestamp = datetime.now() # Store in mcp_commands try: commands_collection = self.db['mcp_commands'] doc = { "command": command, "agent_used": agent_id, "result": result, "timestamp": timestamp, "created_at": timestamp, "server": "embedded_mcp_server", "storage_method": "force_store" } print("📦 force_store_result mcp_commands:", doc) commands_collection.insert_one(doc) self.logger.info(f"✅ Force stored in mcp_commands: {agent_id}") except Exception as e: self.logger.error(f"Failed to store in mcp_commands: {e}") # Store in agent_outputs try: await self.save_agent_output( agent_id, {"command": command, "query": command, "type": "force_store"}, result, {"timestamp": timestamp.isoformat(), "storage_method": "force_store"} ) self.logger.info(f"✅ Force stored in agent_outputs: {agent_id}") except Exception as e: self.logger.error(f"Failed to store in agent_outputs: {e}") # Store in all_results try: results_collection = self.db['all_results'] doc2 = { "agent_id": agent_id, "command": command, "result": result, "timestamp": timestamp, "created_at": timestamp, "storage_method": "force_store_fallback" } print("📦 force_store_result all_results:", doc2) results_collection.insert_one(doc2) self.logger.info(f"✅ Force stored in all_results: {agent_id}") except Exception as e: self.logger.error(f"Failed to store in all_results: {e}") return True except Exception as e: self.logger.error(f"❌ Complete force store failure: {e}") return False

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Nisarg-123-web/MCP2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server