Email Processing MCP Server

by Cam10001110101
Verified
  • src
#!/usr/bin/env python3 import os import sys import logging from typing import Dict, Any, List, Optional # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S', stream=sys.stderr ) # Environment variables are set by the MCP config file from datetime import datetime from fastmcp import FastMCP, Context from MongoDBHandler import MongoDBHandler from SQLiteHandler import SQLiteHandler from OutlookConnector import OutlookConnector from EmailMetadata import EmailMetadata from langchain_ollama import OllamaEmbeddings from debug_utils import dump_email_debug # Initialize FastMCP server with dependencies mcp = FastMCP( "outlook-email", dependencies=[ "pymongo", "langchain", "langchain_ollama", "pywin32" ] ) def validate_config(config: Dict[str, str]) -> None: """Validate required configuration values.""" required_vars = [ "MONGODB_URI", "SQLITE_DB_PATH", "EMBEDDING_BASE_URL", "EMBEDDING_MODEL", "COLLECTION_NAME" ] missing_vars = [var for var in required_vars if not config.get(var)] if missing_vars: raise ValueError(f"Missing required configuration: {', '.join(missing_vars)}") # Set default values for optional configuration if "PROCESS_DELETED_ITEMS" not in config: config["PROCESS_DELETED_ITEMS"] = "false" class EmailProcessor: def __init__(self, config: Dict[str, str]): """ Initialize the email processor with configuration. Args: config: Dictionary containing configuration values: - MONGODB_URI: MongoDB connection string - SQLITE_DB_PATH: Path to SQLite database - EMBEDDING_BASE_URL: Base URL for embeddings - EMBEDDING_MODEL: Model to use for embeddings - COLLECTION_NAME: Name of the MongoDB collection to use """ self.config = config self.collection_name = config["COLLECTION_NAME"] # Initialize embedding processor from tools.embedding_processor import EmbeddingProcessor self.embedding_processor = EmbeddingProcessor( db_path=config["MONGODB_URI"], collection_name=self.collection_name ) # Initialize SQLite handler self.sqlite = SQLiteHandler(config["SQLITE_DB_PATH"]) # Initialize Outlook connector with deleted items setting process_deleted = config.get("PROCESS_DELETED_ITEMS", "false").lower() == "true" self.outlook = OutlookConnector(process_deleted_items=process_deleted) async def process_emails( self, start_date: str, end_date: str, mailboxes: List[str], ctx: Context ) -> Dict[str, Any]: """Process emails from the specified date range and mailboxes.""" try: # Convert dates start = datetime.fromisoformat(start_date) end = datetime.fromisoformat(end_date) # Validate date range if (end - start).days > 30: raise ValueError("Date range cannot exceed 30 days") # Get mailboxes await ctx.report_progress(0, "Initializing email processing") if "All" in mailboxes: outlook_mailboxes = self.outlook.get_mailboxes() else: outlook_mailboxes = [] for mailbox_name in mailboxes: mailbox = self.outlook.get_mailbox(mailbox_name) if mailbox is not None: outlook_mailboxes.append(mailbox) if not outlook_mailboxes: return { "success": False, "error": "No valid mailboxes found" } # Include Deleted Items folder if enabled folder_names = ["Inbox", "Sent Items"] if self.outlook.process_deleted_items: folder_names.append("Deleted Items") await ctx.report_progress(10, f"Retrieving emails from {', '.join(folder_names)}") all_emails = [] for i, mailbox in enumerate(outlook_mailboxes): try: emails = self.outlook.get_emails_within_date_range( folder_names, start.isoformat(), end.isoformat(), [mailbox] ) if emails: all_emails.extend(emails) progress = 10 + (40 * (i + 1) / len(outlook_mailboxes)) await ctx.report_progress(progress, f"Processing mailbox {i+1}/{len(outlook_mailboxes)}") except Exception: continue if not all_emails: return { "success": False, "error": "No emails found in any mailbox" } await ctx.report_progress(50, "Storing emails in SQLite") total_stored = 0 for i, email in enumerate(all_emails): if self.sqlite.add_or_update_email(email): total_stored += 1 progress = 50 + (20 * (i + 1) / len(all_emails)) await ctx.report_progress(progress, f"Storing email {i+1}/{len(all_emails)}") if total_stored == 0: return { "success": False, "error": "Failed to store any emails in SQLite" } unprocessed = self.sqlite.get_unprocessed_emails() email_dicts = [email for email in unprocessed] await ctx.report_progress(70, "Processing embeddings") if not email_dicts: return { "success": True, "processed_count": 0, "message": "No new emails to process" } total_processed, total_failed = self.embedding_processor.process_batch(email_dicts) await ctx.report_progress(90, "Finalizing processing") for email in email_dicts[:total_processed]: self.sqlite.mark_as_processed(email['id']) result = { "success": True, "processed_count": total_processed, "message": (f"Successfully processed {total_processed} emails " f"(retrieved: {len(all_emails)}, stored: {total_stored}, " f"failed: {total_failed})") } await ctx.report_progress(100, "Processing complete") return result except Exception as e: return { "success": False, "error": str(e) } # Note: We don't close connections here because they might be needed for future operations # Connections will be closed by the atexit handler when the server shuts down try: # Load configuration from environment config = { "MONGODB_URI": os.environ.get("MONGODB_URI"), "SQLITE_DB_PATH": os.environ.get("SQLITE_DB_PATH"), "EMBEDDING_BASE_URL": os.environ.get("EMBEDDING_BASE_URL"), "EMBEDDING_MODEL": os.environ.get("EMBEDDING_MODEL"), "COLLECTION_NAME": os.environ.get("COLLECTION_NAME"), "PROCESS_DELETED_ITEMS": os.environ.get("PROCESS_DELETED_ITEMS", "false") } # Log environment variables for debugging logging.info("Environment variables:") # Redact sensitive information from MongoDB URI mongodb_uri = os.environ.get('MONGODB_URI', '') if mongodb_uri: # Simple redaction that keeps the host but hides credentials redacted_uri = mongodb_uri if '@' in mongodb_uri: # Format is typically mongodb://username:password@host:port/db redacted_uri = 'mongodb://' + mongodb_uri.split('@', 1)[1] logging.info(f"MONGODB_URI: {redacted_uri}") logging.info(f"SQLITE_DB_PATH: {os.environ.get('SQLITE_DB_PATH')}") logging.info(f"EMBEDDING_BASE_URL: {os.environ.get('EMBEDDING_BASE_URL')}") logging.info(f"EMBEDDING_MODEL: {os.environ.get('EMBEDDING_MODEL')}") logging.info(f"COLLECTION_NAME: {os.environ.get('COLLECTION_NAME')}") logging.info(f"PROCESS_DELETED_ITEMS: {os.environ.get('PROCESS_DELETED_ITEMS', 'false')}") # Validate configuration validate_config(config) processor = EmailProcessor(config) except Exception as e: raise # Register cleanup handler for server shutdown import atexit def cleanup_resources(): """Clean up resources when the server shuts down.""" try: if 'processor' in globals(): # Close SQLite connection if hasattr(processor, 'sqlite'): processor.sqlite.close() logging.info("SQLite connection closed during shutdown") # Close MongoDB connection if hasattr(processor, 'embedding_processor') and hasattr(processor.embedding_processor, 'mongodb_handler'): processor.embedding_processor.mongodb_handler.close() logging.info("MongoDB connection closed during shutdown") except Exception as e: logging.error(f"Error during cleanup: {str(e)}") atexit.register(cleanup_resources) @mcp.tool() async def process_emails( start_date: str, end_date: str, mailboxes: List[str], ctx: Context = None ) -> str: """Process emails from specified date range and mailboxes. Args: start_date: Start date in ISO format (YYYY-MM-DD) end_date: End date in ISO format (YYYY-MM-DD) mailboxes: List of mailbox names or ["All"] for all mailboxes """ try: # Validate date formats try: datetime.fromisoformat(start_date) datetime.fromisoformat(end_date) except ValueError: return "Error: Dates must be in ISO format (YYYY-MM-DD)" result = await processor.process_emails(start_date, end_date, mailboxes, ctx) if result["success"]: return result["message"] else: return f"Error processing emails: {result['error']}" except Exception as e: return f"Error: {str(e)}" if __name__ == "__main__": # Run the server mcp.run()