Email Processing MCP Server
by Cam10001110101
Verified
import json
import uuid
import os
import sys
import time
import logging
from typing import List, Dict, Any, Tuple
from datetime import datetime
from langchain_ollama import OllamaEmbeddings
# Configure logging
logger = logging.getLogger('outlook-email.embedding')
class EmbeddingProcessor:
def __init__(self, db_path: str, collection_name: str):
"""
Initialize the embedding processor.
Args:
db_path: Path to storage
collection_name: Name of the collection to use
"""
# Import here to avoid circular imports
from MongoDBHandler import MongoDBHandler
# Initialize MongoDB handler
self.mongodb_handler = MongoDBHandler(
db_path,
collection_name
)
# Initialize Ollama embeddings
try:
self.embeddings = OllamaEmbeddings(
model=os.getenv("EMBEDDING_MODEL"),
base_url=os.getenv("EMBEDDING_BASE_URL")
)
except Exception as e:
raise
def create_email_content(self, email: Dict[str, Any]) -> str:
"""Create a formatted string of email content for embedding."""
return f"""
Subject: {email.get('Subject', '')}
From: {email.get('SenderName', '')} <{email.get('SenderEmailAddress', '')}>
To: {email.get('To', '')}
Date: {email.get('ReceivedTime', '')}
{email.get('Body', '')}
"""
def validate_email_data(self, email: Dict[str, Any]) -> bool:
"""Validate email data structure and content."""
required_fields = [
'Subject', 'SenderName', 'SenderEmailAddress', 'To',
'ReceivedTime', 'Folder', 'AccountName', 'Body'
]
# Check required fields exist and are not None
for field in required_fields:
if field not in email or email[field] is None:
return False
# Validate dates are in ISO format
try:
if email['ReceivedTime']:
datetime.fromisoformat(email['ReceivedTime'])
except (ValueError, TypeError):
return False
return True
def process_batch(self, emails: List[Dict[str, Any]], batch_size: int = 4) -> Tuple[int, int]:
"""
Process a batch of emails to generate embeddings with validation.
Args:
emails: List of email dictionaries to process
batch_size: Size of batches for processing (default: 4)
Returns:
Tuple[int, int]: (number of successfully processed emails, number of failed emails)
"""
documents = []
metadatas = []
ids = []
failed_count = 0
for i, email in enumerate(emails):
try:
# Validate email data
if not self.validate_email_data(email):
failed_count += 1
continue
# Create content for embedding
content = self.create_email_content(email)
# Create metadata dictionary
metadata = {
'Subject': email.get('Subject', ''),
'SenderName': email.get('SenderName', ''),
'SenderEmailAddress': email.get('SenderEmailAddress', ''),
'To': email.get('To', ''),
'ReceivedTime': email.get('ReceivedTime', ''),
'Folder': email.get('Folder', ''),
'AccountName': email.get('AccountName', '')
}
# Validate metadata can be JSON encoded
try:
json.dumps(metadata)
except (TypeError, ValueError):
failed_count += 1
continue
documents.append(content)
metadatas.append(metadata)
ids.append(email.get('id', str(uuid.uuid4())))
except Exception as e:
failed_count += 1
continue
if not documents:
return 0, failed_count
# Process documents in batches
try:
# Add retry logic for embedding generation
max_embed_retries = 3
embeddings = None
for embed_attempt in range(max_embed_retries):
try:
logger.info(f"Generating embeddings for {len(documents)} documents (attempt {embed_attempt + 1}/{max_embed_retries})")
embeddings = self.embeddings.embed_documents(documents)
logger.info(f"Successfully generated {len(embeddings)} embeddings")
break
except Exception as e:
logger.error(f"Error generating embeddings (attempt {embed_attempt + 1}/{max_embed_retries}): {str(e)}")
if embed_attempt == max_embed_retries - 1:
logger.error("Failed to generate embeddings after all retries")
return 0, len(documents) + failed_count
time.sleep(2) # Wait before retry
if not embeddings:
logger.error("No embeddings generated")
return 0, len(documents) + failed_count
# Create batch of documents to add to MongoDB
batch = [{
'id': id_,
'embedding': emb,
'document': doc,
'metadata': meta
} for id_, emb, doc, meta in zip(ids, embeddings, documents, metadatas)]
logger.info(f"Adding {len(batch)} documents to MongoDB")
# Add to MongoDB with retries
max_retries = 3
for attempt in range(max_retries):
try:
if self.mongodb_handler.add_embeddings(batch):
logger.info(f"Successfully added {len(batch)} documents to MongoDB")
return len(batch), failed_count
else:
logger.warning(f"Failed to add documents to MongoDB (attempt {attempt + 1}/{max_retries})")
if attempt < max_retries - 1:
time.sleep(1)
continue
return 0, len(batch) + failed_count
except Exception as e:
logger.error(f"Error adding documents to MongoDB (attempt {attempt + 1}/{max_retries}): {str(e)}")
if attempt < max_retries - 1:
time.sleep(1)
continue
return 0, len(batch) + failed_count
except Exception as e:
logger.error(f"Error processing batch: {str(e)}")
return 0, len(documents) + failed_count
def close(self) -> None:
"""Close connections."""
try:
# Close MongoDB connection
if hasattr(self, 'mongodb_handler'):
self.mongodb_handler.close()
logger.info("MongoDB connection closed from EmbeddingProcessor")
except Exception as e:
logger.error(f"Error closing connections: {str(e)}")
def __del__(self) -> None:
"""Destructor to ensure connections are closed when object is garbage collected."""
self.close()