mongodb_connector.py•16.8 kB
"""
MongoDB Connector Module for MCP Server
Handles MongoDB connections with proper error handling and data transformation
"""
import logging
import os
from datetime import datetime
from typing import Dict, List, Any, Optional
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MongoDBConnector:
"""
MongoDB connector for RTIdeas MCP server.
Manages connections to MongoDB collections for sessions, ideas, and connections.
"""
def __init__(self):
"""Initialize MongoDB connector."""
self.client = None
self.db = None
# Load MongoDB configuration from environment variables
self.mongodb_uri = os.getenv("MONGODB_URI")
self.database_name = os.getenv("MONGODB_DATABASE", "test")
if not self.mongodb_uri:
logger.error("MONGODB_URI environment variable is required")
raise ValueError("MONGODB_URI must be set in environment variables")
# Initialize connection
self._initialize_connection()
def _initialize_connection(self):
"""Initialize MongoDB connection with retry logic."""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
self.client = MongoClient(
self.mongodb_uri,
serverSelectionTimeoutMS=5000, # 5 second timeout
connectTimeoutMS=10000, # 10 second timeout
socketTimeoutMS=20000, # 20 second timeout
)
# Test connection
self.client.admin.command('ping')
self.db = self.client[self.database_name]
logger.info(f"✅ MongoDB connection initialized successfully")
logger.info(f"📊 Connected to database: {self.database_name}")
return
except (ConnectionFailure, ServerSelectionTimeoutError) as e:
retry_count += 1
logger.error(f"❌ Failed to connect to MongoDB (attempt {retry_count}/{max_retries}): {e}")
if retry_count >= max_retries:
logger.error("Maximum retries reached. Could not connect to MongoDB.")
raise
# Wait before retrying (exponential backoff)
import time
time.sleep(2 ** retry_count)
def test_connection(self) -> bool:
"""
Test the MongoDB connection.
Returns:
bool: True if connection successful, False otherwise
"""
try:
self.client.admin.command('ping')
logger.info("✅ MongoDB connection test successful")
return True
except Exception as e:
logger.error(f"❌ MongoDB connection test failed: {e}")
return False
def get_collections_info(self) -> Dict[str, Any]:
"""
Get information about available collections.
Returns:
Dictionary with collection information
"""
try:
collections = self.db.list_collection_names()
collection_info = {}
for collection_name in collections:
collection = self.db[collection_name]
count = collection.count_documents({})
collection_info[collection_name] = {
"count": count,
"indexes": list(collection.list_indexes())
}
return {
"database": self.database_name,
"collections": collection_info,
"total_collections": len(collections)
}
except Exception as e:
logger.error(f"❌ Error getting collections info: {e}")
return {"error": str(e)}
def fetch_all_sessions(self, limit: int = 100) -> List[Dict[str, Any]]:
"""
Fetch all sessions from the database.
Args:
limit: Maximum number of sessions to return
Returns:
List of dictionaries containing session data
"""
try:
sessions_collection = self.db.sessions
sessions = list(sessions_collection.find({}).sort("createdAt", -1).limit(limit))
# Transform MongoDB documents to MCP format
transformed_sessions = [self._transform_session(session) for session in sessions]
logger.info(f"📊 Fetched {len(transformed_sessions)} sessions from database")
return transformed_sessions
except Exception as e:
logger.error(f"❌ Error fetching sessions: {e}")
return []
def fetch_session_by_id(self, session_id: str) -> Optional[Dict[str, Any]]:
"""
Fetch a specific session by ID.
Args:
session_id: Session ID to fetch
Returns:
Dictionary with session data or None if not found
"""
try:
sessions_collection = self.db.sessions
session = sessions_collection.find_one({"$or": [{"_id": session_id}, {"code": session_id}]})
if session:
return self._transform_session(session)
return None
except Exception as e:
logger.error(f"❌ Error fetching session {session_id}: {e}")
return None
def fetch_ideas_by_session(self, session_id: str, limit: int = 100) -> List[Dict[str, Any]]:
"""
Fetch ideas for a specific session.
Args:
session_id: Session ID to fetch ideas for
limit: Maximum number of ideas to return
Returns:
List of dictionaries containing idea data
"""
try:
ideas_collection = self.db.ideas
ideas = list(ideas_collection.find({"sessionId": session_id}).sort("createdAt", -1).limit(limit))
# Transform MongoDB documents to MCP format
transformed_ideas = [self._transform_idea(idea) for idea in ideas]
logger.info(f"📊 Fetched {len(transformed_ideas)} ideas for session {session_id}")
return transformed_ideas
except Exception as e:
logger.error(f"❌ Error fetching ideas for session {session_id}: {e}")
return []
def search_ideas(self, session_id: str, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""
Search ideas using text search.
Args:
session_id: Session ID to search in
query: Search query
limit: Maximum number of results
Returns:
List of matching ideas
"""
try:
ideas_collection = self.db.ideas
# Create text search query
search_query = {
"sessionId": session_id,
"$or": [
{"text": {"$regex": query, "$options": "i"}},
{"name": {"$regex": query, "$options": "i"}}
]
}
ideas = list(ideas_collection.find(search_query).sort("createdAt", -1).limit(limit))
# Transform MongoDB documents to MCP format
transformed_ideas = [self._transform_idea(idea) for idea in ideas]
logger.info(f"🔍 Found {len(transformed_ideas)} ideas matching '{query}' in session {session_id}")
return transformed_ideas
except Exception as e:
logger.error(f"❌ Error searching ideas: {e}")
return []
def fetch_idea_connections(self, session_id: str, limit: int = 100) -> List[Dict[str, Any]]:
"""
Fetch idea connections for a session.
Args:
session_id: Session ID to fetch connections for
limit: Maximum number of connections to return
Returns:
List of idea connections
"""
try:
connections_collection = self.db.ideaconnections
connections = list(connections_collection.find({"sessionId": session_id}).limit(limit))
# Transform MongoDB documents to MCP format
transformed_connections = [self._transform_connection(connection) for connection in connections]
logger.info(f"📊 Fetched {len(transformed_connections)} connections for session {session_id}")
return transformed_connections
except Exception as e:
logger.error(f"❌ Error fetching connections for session {session_id}: {e}")
return []
def fetch_idea_clusters(self, session_id: str) -> List[Dict[str, Any]]:
"""
Fetch idea clusters for a session.
Args:
session_id: Session ID to fetch clusters for
Returns:
List of idea clusters
"""
try:
clusters_collection = self.db.ideaclusters
clusters = list(clusters_collection.find({"sessionId": session_id}))
# Transform MongoDB documents to MCP format
transformed_clusters = [self._transform_cluster(cluster) for cluster in clusters]
logger.info(f"📊 Fetched {len(transformed_clusters)} clusters for session {session_id}")
return transformed_clusters
except Exception as e:
logger.error(f"❌ Error fetching clusters for session {session_id}: {e}")
return []
def get_session_stats(self, session_id: str) -> Dict[str, Any]:
"""
Get statistics for a session.
Args:
session_id: Session ID to get stats for
Returns:
Dictionary with session statistics
"""
try:
# Get session info
session = self.fetch_session_by_id(session_id)
if not session:
return {"error": "Session not found"}
# Get ideas count
ideas_count = self.db.ideas.count_documents({"sessionId": session_id})
# Get connections count
connections_count = self.db.ideaconnections.count_documents({"sessionId": session_id})
# Get clusters count
clusters_count = self.db.ideaclusters.count_documents({"sessionId": session_id})
return {
"session_id": session_id,
"session_name": session.get("name", "Unknown"),
"ideas_count": ideas_count,
"connections_count": connections_count,
"clusters_count": clusters_count,
"created_at": session.get("createdAt"),
"status": session.get("status", "unknown")
}
except Exception as e:
logger.error(f"❌ Error getting session stats for {session_id}: {e}")
return {"error": str(e)}
def _transform_session(self, session: Dict[str, Any]) -> Dict[str, Any]:
"""
Transform MongoDB session document to MCP format.
Args:
session: Raw MongoDB session document
Returns:
Transformed dictionary
"""
return {
"id": str(session.get("_id", "")),
"name": session.get("name", ""),
"code": session.get("code", ""),
"adminId": session.get("adminId", ""),
"status": session.get("status", "active"),
"createdAt": session.get("createdAt", datetime.now()).isoformat() if isinstance(session.get("createdAt"), datetime) else str(session.get("createdAt", "")),
"updatedAt": session.get("updatedAt", datetime.now()).isoformat() if isinstance(session.get("updatedAt"), datetime) else str(session.get("updatedAt", "")),
"settings": session.get("settings", {}),
"metadata": session.get("metadata", {})
}
def _transform_idea(self, idea: Dict[str, Any]) -> Dict[str, Any]:
"""
Transform MongoDB idea document to MCP format.
Args:
idea: Raw MongoDB idea document
Returns:
Transformed dictionary
"""
return {
"id": str(idea.get("_id", "")),
"sessionId": idea.get("sessionId", ""),
"text": idea.get("text", ""),
"name": idea.get("name", ""),
"createdAt": idea.get("createdAt", datetime.now()).isoformat() if isinstance(idea.get("createdAt"), datetime) else str(idea.get("createdAt", "")),
"updatedAt": idea.get("updatedAt", datetime.now()).isoformat() if isinstance(idea.get("updatedAt"), datetime) else str(idea.get("updatedAt", "")),
"embedding": idea.get("embedding", []),
"tags": idea.get("tags", []),
"metadata": idea.get("metadata", {})
}
def _transform_connection(self, connection: Dict[str, Any]) -> Dict[str, Any]:
"""
Transform MongoDB connection document to MCP format.
Args:
connection: Raw MongoDB connection document
Returns:
Transformed dictionary
"""
return {
"id": str(connection.get("_id", "")),
"sessionId": connection.get("sessionId", ""),
"ideaA": connection.get("ideaA", ""),
"ideaB": connection.get("ideaB", ""),
"similarity": connection.get("similarity", 0.0),
"createdAt": connection.get("createdAt", datetime.now()).isoformat() if isinstance(connection.get("createdAt"), datetime) else str(connection.get("createdAt", "")),
"metadata": connection.get("metadata", {})
}
def _transform_cluster(self, cluster: Dict[str, Any]) -> Dict[str, Any]:
"""
Transform MongoDB cluster document to MCP format.
Args:
cluster: Raw MongoDB cluster document
Returns:
Transformed dictionary
"""
return {
"id": str(cluster.get("_id", "")),
"sessionId": cluster.get("sessionId", ""),
"name": cluster.get("name", ""),
"ideaIds": cluster.get("ideaIds", []),
"centroid": cluster.get("centroid", []),
"createdAt": cluster.get("createdAt", datetime.now()).isoformat() if isinstance(cluster.get("createdAt"), datetime) else str(cluster.get("createdAt", "")),
"metadata": cluster.get("metadata", {})
}
def close_connection(self):
"""Close MongoDB connection."""
try:
if self.client:
self.client.close()
logger.info("🔌 MongoDB connection closed")
except Exception as e:
logger.error(f"Error closing MongoDB connection: {e}")
# Singleton instance for reuse across the application
_mongodb_connector_instance = None
def get_mongodb_connector() -> MongoDBConnector:
"""
Get or create the singleton MongoDBConnector instance.
Returns:
MongoDBConnector instance
"""
global _mongodb_connector_instance
if _mongodb_connector_instance is None:
_mongodb_connector_instance = MongoDBConnector()
return _mongodb_connector_instance
# Test function
def test_mongodb_connection():
"""Test the MongoDB connection and print sample data."""
try:
db = get_mongodb_connector()
# Test connection
if not db.test_connection():
print("❌ Connection test failed")
return
# Get collections info
collections_info = db.get_collections_info()
print(f"📊 Database: {collections_info.get('database', 'Unknown')}")
print(f"📊 Collections: {collections_info.get('total_collections', 0)}")
# Get sessions
sessions = db.fetch_all_sessions(limit=5)
print(f"✅ Successfully fetched {len(sessions)} sessions")
if sessions:
print("\n📋 Sample session:")
print(sessions[0])
except Exception as e:
print(f"❌ Test failed: {e}")
if __name__ == "__main__":
# Run test when executed directly
test_mongodb_connection()