Email Processing MCP Server

by Cam10001110101
Verified
  • src
import logging import time from typing import List, Dict, Any, Optional from pymongo import MongoClient from pymongo.collection import Collection from pymongo.errors import DuplicateKeyError # Configure logging logger = logging.getLogger('outlook-email.mongodb') class MongoDBHandler: def __init__(self, connection_string: str, collection_name: str) -> None: """ Initialize the MongoDBHandler with the connection string and collection name. Args: connection_string (str): MongoDB connection string collection_name (str): Name of the collection to manage """ try: logger.info(f"Initializing MongoDB connection") self.client = MongoClient(connection_string) self.db = self.client.get_database() self.collection_name = collection_name self.collection = self._get_or_create_collection() # Create index on id field self.collection.create_index("id", unique=True) logger.info("MongoDB initialized successfully") except Exception as e: logger.error(f"Error initializing MongoDB: {str(e)}", exc_info=True) raise def _get_or_create_collection(self, max_retries: int = 3) -> Collection: """Get or create collection with retry logic.""" for attempt in range(max_retries): try: return self.db[self.collection_name] except Exception as e: if attempt == max_retries - 1: raise logger.warning(f"Retry {attempt + 1}/{max_retries} getting collection: {str(e)}") time.sleep(1) # Wait before retry def add_embeddings(self, embeddings: List[Dict[str, Any]], job_id: Optional[str] = None) -> bool: """ Add embeddings to the MongoDB collection. Args: embeddings (List[Dict]): List of embeddings to add job_id (str, optional): Job ID for tracking Returns: bool: True if embeddings were added successfully """ try: # Filter out embeddings with existing IDs new_embeddings = [] for embedding in embeddings: if not all(k in embedding for k in ['id', 'embedding', 'document', 'metadata']): raise ValueError("Missing required fields in embedding") try: # Check if ID already exists if not self.email_exists(str(embedding['id'])): # Initialize and sanitize metadata embedding['metadata'] = embedding.get('metadata', {}) # Ensure all metadata values are primitive types for key, value in embedding['metadata'].items(): if isinstance(value, (list, dict)): embedding['metadata'][key] = str(value) elif value is None: embedding['metadata'][key] = '' new_embeddings.append(embedding) except Exception as e: logger.warning(f"Error checking email existence: {str(e)}") continue if not new_embeddings: logger.info("No new embeddings to add") return True logger.info(f"Adding {len(new_embeddings)} embeddings to MongoDB") # Add embeddings with retry logic max_retries = 3 for attempt in range(max_retries): try: # Convert embeddings to MongoDB documents documents = [] for embedding in new_embeddings: doc = { 'id': str(embedding['id']), 'embedding': embedding['embedding'], 'document': embedding['document'], 'metadata': embedding['metadata'] } documents.append(doc) # Insert documents self.collection.insert_many(documents) logger.info("Successfully added embeddings to MongoDB") return True except DuplicateKeyError: logger.warning("Duplicate key found, skipping those documents") return True except Exception as e: if attempt == max_retries - 1: logger.error(f"Failed to add embeddings after {max_retries} attempts: {str(e)}") return False logger.warning(f"Retry {attempt + 1}/{max_retries} adding embeddings: {str(e)}") time.sleep(1) # Wait before retry except Exception as e: logger.error(f"Error adding embeddings: {str(e)}", exc_info=True) return False def email_exists(self, entry_id: str) -> bool: """ Check if an email entry exists. Args: entry_id (str): ID of the email entry Returns: bool: True if exists """ try: result = self.collection.find_one({'id': str(entry_id)}) return result is not None except Exception as e: logger.error(f"Error checking email existence: {str(e)}") return False def get_collection_count(self) -> int: """ Get the count of documents in the collection. Returns: int: Number of documents """ try: return self.collection.count_documents({}) except Exception as e: logger.error(f"Error getting collection count: {str(e)}") return 0 def get_metadata(self, entry_id: str) -> Optional[Dict[str, Any]]: """ Get metadata for a specific entry. Args: entry_id (str): ID of the entry Returns: Optional[Dict[str, Any]]: The metadata if found """ try: result = self.collection.find_one({'id': str(entry_id)}) if result: return result.get('metadata') return None except Exception as e: logger.error(f"Error getting metadata: {str(e)}") return None def close(self) -> None: """Close the MongoDB connection.""" try: if hasattr(self, 'client') and self.client: self.client.close() logger.info("MongoDB connection closed") except Exception as e: logger.error(f"Error closing MongoDB connection: {str(e)}", exc_info=True) def __del__(self) -> None: """Destructor to ensure connection is closed when object is garbage collected.""" self.close() def __enter__(self) -> 'MongoDBHandler': """Enter context manager.""" return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: """Exit context manager and close connection.""" self.close()