Skip to main content
Glama
server.py38.2 kB
""" Outlook MCP Server - Main MCP server implementation """ import asyncio import json import os import sys from datetime import datetime, timedelta from pathlib import Path from typing import Optional # Load .env file from project root from dotenv import load_dotenv env_path = Path(__file__).parent.parent.parent.parent / ".env" if env_path.exists(): load_dotenv(env_path) from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent from .config import get_config from .outlook_reader import get_outlook_reader, OutlookReader from .email_indexer import get_indexer, run_full_index, get_embedding_model from .attachment_parser import ( get_attachment_parser, extract_attachment_text, PYMUPDF_AVAILABLE, TESSERACT_AVAILABLE, ANTHROPIC_AVAILABLE, GOOGLE_GENAI_AVAILABLE ) # Pre-load embedding model at startup (takes ~1 minute) print("Loading embedding model... (this may take about 1 minute)", file=sys.stderr) _embedding_model = get_embedding_model() print("Embedding model loaded!", file=sys.stderr) # Create MCP server app = Server("outlook-mcp") def _process_attachment_batch(collection, model, attachments_data, config): """Process a batch of attachments with batch embedding Args: collection: ChromaDB collection for attachments model: Embedding model attachments_data: List of (email_entry_id, filename, text_content) tuples config: Config object """ import hashlib indexed = 0 skipped = 0 errors = [] # Prepare documents documents = [] valid_attachments = [] for email_entry_id, filename, text_content in attachments_data: # Generate attachment ID att_id = hashlib.sha256(f"{email_entry_id}_{filename}".encode()).hexdigest()[:32] # Check if already indexed try: existing = collection.get(ids=[att_id]) if existing and existing["ids"]: skipped += 1 continue except: pass # Limit text length text = text_content[:10000] if text_content else "" if not text.strip(): skipped += 1 continue documents.append(f"Filename: {filename}\n\n{text}") valid_attachments.append((att_id, email_entry_id, filename)) if not documents: return indexed, skipped, errors # Batch encode try: embeddings = model.encode(documents).tolist() for i, (att_id, email_entry_id, filename) in enumerate(valid_attachments): try: metadata = { "email_entry_id": email_entry_id, "filename": filename[:200], "type": "attachment", } collection.add( ids=[att_id], embeddings=[embeddings[i]], documents=[documents[i][:50000]], # Increased for Vision AI metadatas=[metadata], ) indexed += 1 except Exception as e: errors.append(str(e)[:100]) skipped += 1 except Exception as e: errors.append(f"Attachment batch encoding error: {str(e)[:100]}") skipped += len(valid_attachments) return indexed, skipped, errors def _process_email_batch(collection, model, emails): """Process a batch of emails with batch embedding""" import hashlib indexed = 0 skipped = 0 errors = [] # Prepare documents for batch encoding documents = [] valid_emails = [] for email in emails: # Generate email ID email_id = hashlib.sha256(email.entry_id.encode()).hexdigest()[:32] # Check if already indexed try: existing = collection.get(ids=[email_id]) if existing and existing["ids"]: skipped += 1 continue except: pass # Prepare document parts = [] if email.subject: parts.append(f"Subject: {email.subject}") parts.append(email.subject) if email.sender_name: parts.append(f"From: {email.sender_name}") if email.recipients: parts.append(f"To: {', '.join(email.recipients[:5])}") if email.received_time: try: parts.append(f"Date: {email.received_time.strftime('%Y-%m-%d')}") except: pass if email.body: parts.append(email.body[:2000]) # Limit body length document = "\n".join(parts) if document: documents.append(document) valid_emails.append((email, email_id)) if not documents: return indexed, skipped, errors # Batch encode all documents try: embeddings = model.encode(documents).tolist() # Add to collection for i, (email, email_id) in enumerate(valid_emails): try: metadata = { "entry_id": email.entry_id, "subject": email.subject[:500] if email.subject else "", "sender_name": email.sender_name[:200] if email.sender_name else "", "sender_email": email.sender_email[:200] if email.sender_email else "", "folder_path": email.folder_path[:200] if email.folder_path else "", "received_time": email.received_time.isoformat() if email.received_time else "", "has_attachments": email.has_attachments, } collection.add( ids=[email_id], embeddings=[embeddings[i]], documents=[documents[i][:50000]], # Limit document size (increased for Vision AI) metadatas=[metadata], ) indexed += 1 except Exception as e: errors.append(str(e)[:100]) skipped += 1 except Exception as e: errors.append(f"Batch encoding error: {str(e)[:100]}") skipped += len(valid_emails) return indexed, skipped, errors @app.list_tools() async def list_tools() -> list[Tool]: """List available tools""" return [ # Search Tools Tool( name="email_search", description="Search emails using natural language query. Returns relevant emails based on semantic similarity to your query.", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Natural language search query (e.g., 'BIM coordination meeting', 'project deadline discussion')" }, "limit": { "type": "integer", "description": "Maximum number of results (default: 10)", "default": 10 }, "sender": { "type": "string", "description": "Optional: filter by sender name" }, "days_back": { "type": "integer", "description": "Optional: only search emails from the last N days" }, "folder": { "type": "string", "description": "Optional: filter by folder path" } }, "required": ["query"] } ), Tool( name="email_search_by_sender", description="Find all emails from a specific sender", inputSchema={ "type": "object", "properties": { "sender": { "type": "string", "description": "Sender name or email to search for" }, "limit": { "type": "integer", "description": "Maximum results (default: 20)", "default": 20 }, "query": { "type": "string", "description": "Optional: additional search terms to narrow results" } }, "required": ["sender"] } ), Tool( name="email_search_by_date", description="Search emails within a date range", inputSchema={ "type": "object", "properties": { "start_date": { "type": "string", "description": "Start date (YYYY-MM-DD format)" }, "end_date": { "type": "string", "description": "End date (YYYY-MM-DD format)" }, "query": { "type": "string", "description": "Optional: search query to filter results" }, "limit": { "type": "integer", "description": "Maximum results (default: 20)", "default": 20 } }, "required": ["start_date"] } ), Tool( name="email_search_attachments", description="Search within email attachment contents (PDF, Word, Excel files)", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Search query for attachment content" }, "limit": { "type": "integer", "description": "Maximum results (default: 10)", "default": 10 } }, "required": ["query"] } ), # Management Tools Tool( name="email_index_status", description="Check the current email indexing status and statistics", inputSchema={ "type": "object", "properties": {} } ), Tool( name="email_index_refresh", description="Index new emails that haven't been indexed yet. This is incremental - only adds new emails.", inputSchema={ "type": "object", "properties": { "days_back": { "type": "integer", "description": "Number of days to look back for new emails (default: 7)", "default": 7 } } } ), Tool( name="email_index_rebuild", description="Rebuild the entire email index. Warning: This can take a long time for large mailboxes.", inputSchema={ "type": "object", "properties": { "period": { "type": "string", "description": "Time period to index: '1week', '1month', '3months', '6months', '1year', '2years', '3years', 'all'", "enum": ["1week", "1month", "3months", "6months", "1year", "2years", "3years", "all"], "default": "1year" }, "confirm": { "type": "boolean", "description": "Must be true to confirm rebuild" }, "vision_provider": { "type": "string", "description": "Vision AI for image analysis: 'ocr' (default, free), 'gemini' (free, rate-limited), 'claude' (paid, best quality)", "enum": ["ocr", "gemini", "claude"], "default": "ocr" } }, "required": ["confirm"] } ), # Retrieval Tools Tool( name="email_get_detail", description="Get full details of a specific email by its ID", inputSchema={ "type": "object", "properties": { "entry_id": { "type": "string", "description": "The email's entry ID (from search results)" } }, "required": ["entry_id"] } ), Tool( name="email_list_folders", description="List all email folders in Outlook", inputSchema={ "type": "object", "properties": { "include_system": { "type": "boolean", "description": "Include system/hidden folders (default: false)", "default": False } } } ), # Connection Tools Tool( name="email_connection_status", description="Check Outlook connection status", inputSchema={ "type": "object", "properties": {} } ), ] @app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: """Handle tool calls""" try: # Search Tools if name == "email_search": query = arguments.get("query", "") limit = arguments.get("limit", 10) sender = arguments.get("sender") days_back = arguments.get("days_back") folder = arguments.get("folder") if not query: return [TextContent(type="text", text=json.dumps({ "success": False, "error": "Query is required" }, indent=2))] # Semantic search using pre-loaded embedding model try: # Use pre-loaded model query_embedding = _embedding_model.encode(query).tolist() # Get ChromaDB collection import chromadb from chromadb.config import Settings config = get_config() client = chromadb.PersistentClient( path=config.db_path, settings=Settings(anonymized_telemetry=False), ) collection = client.get_or_create_collection(name="emails") # Query with embedding search_results = collection.query( query_embeddings=[query_embedding], n_results=limit * 2, # Get more to account for filtering include=["documents", "metadatas", "distances"], ) if not search_results or not search_results.get("ids") or not search_results["ids"][0]: return [TextContent(type="text", text=json.dumps({ "success": True, "query": query, "result_count": 0, "results": [], "search_type": "semantic", "note": "No indexed emails. Run indexing first." }, indent=2))] # Process results results = [] seen_entries = set() for i, doc_id in enumerate(search_results["ids"][0]): metadata = search_results["metadatas"][0][i] if search_results.get("metadatas") else {} document = search_results["documents"][0][i] if search_results.get("documents") else "" distance = search_results["distances"][0][i] if search_results.get("distances") else 0 entry_id = metadata.get("entry_id", "") # Skip duplicates (chunked emails) if entry_id in seen_entries: continue subject = metadata.get("subject", "") sender_name = metadata.get("sender_name", "") # Apply sender filter if sender and sender.lower() not in sender_name.lower(): continue # Apply days filter if days_back: received_str = metadata.get("received_time", "") if received_str: try: received = datetime.fromisoformat(received_str.replace("+00:00", "")) cutoff = datetime.now() - timedelta(days=days_back) if received < cutoff: continue except: pass # Convert distance to similarity score (0-1, higher is better) similarity = 1 / (1 + distance) seen_entries.add(entry_id) results.append({ "entry_id": entry_id, "subject": subject, "sender_name": sender_name, "sender_email": metadata.get("sender_email", ""), "received_time": metadata.get("received_time", ""), "has_attachments": metadata.get("has_attachments", False), "relevance_score": round(similarity, 4), "preview": document[:200] if document else "", }) if len(results) >= limit: break except Exception as e: return [TextContent(type="text", text=json.dumps({ "success": False, "error": str(e) }, indent=2))] return [TextContent(type="text", text=json.dumps({ "success": True, "query": query, "search_type": "semantic", "result_count": len(results), "results": results }, indent=2, ensure_ascii=False))] elif name == "email_search_by_sender": sender = arguments.get("sender", "") limit = arguments.get("limit", 20) query = arguments.get("query", "") if not sender: return [TextContent(type="text", text=json.dumps({ "success": False, "error": "Sender is required" }, indent=2))] # Use sender as part of query if no specific query given search_query = f"{query} from {sender}" if query else f"emails from {sender}" indexer = get_indexer() results = indexer.search_emails( query=search_query, limit=limit, sender_filter=sender, ) return [TextContent(type="text", text=json.dumps({ "success": True, "sender_filter": sender, "result_count": len(results), "results": results }, indent=2, ensure_ascii=False))] elif name == "email_search_by_date": start_date_str = arguments.get("start_date", "") end_date_str = arguments.get("end_date") query = arguments.get("query", "") limit = arguments.get("limit", 20) if not start_date_str: return [TextContent(type="text", text=json.dumps({ "success": False, "error": "start_date is required (YYYY-MM-DD format)" }, indent=2))] try: date_from = datetime.strptime(start_date_str, "%Y-%m-%d") if end_date_str: # Set to end of day (23:59:59) to include all emails on that day date_to = datetime.strptime(end_date_str, "%Y-%m-%d").replace(hour=23, minute=59, second=59) else: date_to = datetime.now() except ValueError: return [TextContent(type="text", text=json.dumps({ "success": False, "error": "Invalid date format. Use YYYY-MM-DD" }, indent=2))] # If no query, use a generic one search_query = query if query else "email" indexer = get_indexer() results = indexer.search_emails( query=search_query, limit=limit, date_from=date_from, date_to=date_to, ) return [TextContent(type="text", text=json.dumps({ "success": True, "date_range": { "from": start_date_str, "to": end_date_str or "now" }, "result_count": len(results), "results": results }, indent=2, ensure_ascii=False))] elif name == "email_search_attachments": query = arguments.get("query", "") limit = arguments.get("limit", 10) if not query: return [TextContent(type="text", text=json.dumps({ "success": False, "error": "Query is required" }, indent=2))] # Use pre-loaded embedding model for semantic search try: import chromadb from chromadb.config import Settings config = get_config() client = chromadb.PersistentClient( path=config.db_path, settings=Settings(anonymized_telemetry=False), ) try: attachments_collection = client.get_collection(name="attachments") except: return [TextContent(type="text", text=json.dumps({ "success": True, "query": query, "result_count": 0, "results": [], "note": "No attachments indexed. Run index rebuild first." }, indent=2))] # Generate query embedding using pre-loaded model query_embedding = _embedding_model.encode(query).tolist() # Query attachments collection search_results = attachments_collection.query( query_embeddings=[query_embedding], n_results=limit * 2, include=["documents", "metadatas", "distances"], ) if not search_results or not search_results.get("ids") or not search_results["ids"][0]: return [TextContent(type="text", text=json.dumps({ "success": True, "query": query, "result_count": 0, "results": [], "note": "No matching attachments found." }, indent=2))] # Process results results = [] seen_attachments = set() for i, att_id in enumerate(search_results["ids"][0]): metadata = search_results["metadatas"][0][i] if search_results.get("metadatas") else {} document = search_results["documents"][0][i] if search_results.get("documents") else "" distance = search_results["distances"][0][i] if search_results.get("distances") else 0 # Deduplicate if att_id in seen_attachments: continue seen_attachments.add(att_id) # Convert distance to similarity score similarity = 1 / (1 + distance) results.append({ "attachment_id": att_id, "email_entry_id": metadata.get("email_entry_id", ""), "filename": metadata.get("filename", ""), "relevance_score": round(similarity, 4), "matched_text": document[:300] if document else "", }) if len(results) >= limit: break except Exception as e: return [TextContent(type="text", text=json.dumps({ "success": False, "error": str(e) }, indent=2))] return [TextContent(type="text", text=json.dumps({ "success": True, "query": query, "search_type": "semantic", "result_count": len(results), "results": results }, indent=2, ensure_ascii=False))] # Management Tools elif name == "email_index_status": indexer = get_indexer() stats = indexer.get_index_stats() # Add Outlook status try: reader = get_outlook_reader() outlook_status = reader.get_connection_status() except Exception as e: outlook_status = {"connected": False, "error": str(e)} return [TextContent(type="text", text=json.dumps({ "success": True, "index_stats": stats, "outlook_status": outlook_status, "config": { "db_path": get_config().db_path, "embedding_model": get_config().embedding_model, } }, indent=2))] elif name == "email_index_refresh": days_back = arguments.get("days_back", 7) # Run indexing synchronously (to_thread causes issues with COM/embedding) result = run_full_index(since_days=days_back) return [TextContent(type="text", text=json.dumps({ "success": True, "message": f"Indexed {result.get('total_indexed', 0)} new emails", "details": result }, indent=2))] elif name == "email_index_rebuild": confirm = arguments.get("confirm", False) period = arguments.get("period", "1year") vision_provider = arguments.get("vision_provider", "ocr") # Convert period to days period_map = { "1week": 7, "1month": 30, "3months": 90, "6months": 180, "1year": 365, "2years": 730, "3years": 1095, "all": None # None means no limit } days_back = period_map.get(period, 365) if not confirm: return [TextContent(type="text", text=json.dumps({ "success": False, "error": "Must set confirm=true to rebuild index", "available_periods": list(period_map.keys()) }, indent=2))] # Run indexing with pre-loaded model and progress tracking import hashlib import chromadb from chromadb.config import Settings from .outlook_reader import OutlookReader config = get_config() # Initialize ChromaDB client = chromadb.PersistentClient( path=config.db_path, settings=Settings(anonymized_telemetry=False), ) # Clear existing index try: client.delete_collection("emails") except: pass try: client.delete_collection("attachments") except: pass collection = client.get_or_create_collection(name="emails") attachments_collection = client.get_or_create_collection(name="attachments") # Connect to Outlook reader = OutlookReader() status = reader.get_connection_status() if not status.get("connected"): return [TextContent(type="text", text=json.dumps({ "success": False, "error": f"Could not connect to Outlook: {status.get('error')}" }, indent=2))] since_date = datetime.now() - timedelta(days=days_back) if days_back else None folders = ["Inbox", "Sent Items"] # Progress tracking batch_size = 50 total_processed = 0 total_indexed = 0 total_skipped = 0 errors = [] progress_log = [] # Attachment tracking att_total = 0 att_indexed = 0 att_skipped = 0 attachment_batch = [] # (email_entry_id, filename, text_content) # Create temp directory for attachment extraction import tempfile import os temp_dir = tempfile.mkdtemp(prefix="outlook_mcp_att_") parser = get_attachment_parser() # Supported attachment extensions supported_exts = {".pdf", ".docx", ".xlsx", ".txt", ".png", ".jpg", ".jpeg"} progress_log.append(f"Attachment parsing: PDF={PYMUPDF_AVAILABLE}, OCR={TESSERACT_AVAILABLE}, Claude={ANTHROPIC_AVAILABLE}, Gemini={GOOGLE_GENAI_AVAILABLE}") progress_log.append(f"Vision provider: {vision_provider}") for folder_path in folders: progress_log.append(f"Processing folder: {folder_path}") email_batch = [] try: for email in reader.get_emails(folder_path=folder_path, since_date=since_date): total_processed += 1 email_batch.append(email) # Process attachments for this email if email.has_attachments and email.attachments: for att_idx, att_info in enumerate(email.attachments): filename = att_info.get("filename", "") ext = os.path.splitext(filename)[1].lower() if ext not in supported_exts: continue att_total += 1 try: # Save attachment to temp directory saved_path = reader.save_attachment( email.entry_id, att_idx, temp_dir ) if saved_path: # Extract text (use vision_provider for images) result = parser.parse_file(saved_path, vision_provider=vision_provider) if result.get("success") and result.get("text"): attachment_batch.append(( email.entry_id, filename, result["text"] )) # Clean up temp file try: os.remove(saved_path) except: pass except Exception as e: errors.append(f"Attachment {filename}: {str(e)[:50]}") # Process email batch when full if len(email_batch) >= batch_size: indexed, skipped, batch_errors = _process_email_batch( collection, _embedding_model, email_batch ) total_indexed += indexed total_skipped += skipped errors.extend(batch_errors) progress_log.append( f" Batch complete: {total_indexed} emails indexed (total: {total_processed})" ) email_batch = [] # Process attachment batch when full if len(attachment_batch) >= 20: a_indexed, a_skipped, a_errors = _process_attachment_batch( attachments_collection, _embedding_model, attachment_batch, config ) att_indexed += a_indexed att_skipped += a_skipped errors.extend(a_errors) progress_log.append( f" Attachments batch: {att_indexed} indexed" ) attachment_batch = [] # Process remaining emails if email_batch: indexed, skipped, batch_errors = _process_email_batch( collection, _embedding_model, email_batch ) total_indexed += indexed total_skipped += skipped errors.extend(batch_errors) progress_log.append( f" Final batch: {total_indexed} emails indexed (total: {total_processed})" ) except Exception as e: errors.append(f"Folder {folder_path}: {str(e)[:100]}") progress_log.append(f" Error: {str(e)[:100]}") # Process remaining attachments if attachment_batch: a_indexed, a_skipped, a_errors = _process_attachment_batch( attachments_collection, _embedding_model, attachment_batch, config ) att_indexed += a_indexed att_skipped += a_skipped errors.extend(a_errors) # Clean up temp directory try: import shutil shutil.rmtree(temp_dir, ignore_errors=True) except: pass progress_log.append(f"=== Indexing Complete ===") progress_log.append(f"Emails: {total_indexed} indexed, {total_skipped} skipped") progress_log.append(f"Attachments: {att_indexed} indexed out of {att_total} found") period_label = "all time" if period == "all" else period return [TextContent(type="text", text=json.dumps({ "success": True, "message": f"Rebuilt index with {total_indexed} emails and {att_indexed} attachments ({period_label})", "period": period, "stats": { "emails_processed": total_processed, "emails_indexed": total_indexed, "emails_skipped": total_skipped, "attachments_found": att_total, "attachments_indexed": att_indexed, "attachments_skipped": att_skipped, "errors_count": len(errors), }, "progress_log": progress_log, "errors": errors[:10] if errors else [] }, indent=2, ensure_ascii=False))] # Retrieval Tools elif name == "email_get_detail": entry_id = arguments.get("entry_id", "") if not entry_id: return [TextContent(type="text", text=json.dumps({ "success": False, "error": "entry_id is required" }, indent=2))] reader = get_outlook_reader() email = reader.get_email_by_id(entry_id) if email is None: return [TextContent(type="text", text=json.dumps({ "success": False, "error": "Email not found" }, indent=2))] return [TextContent(type="text", text=json.dumps({ "success": True, "email": email.to_dict() }, indent=2, ensure_ascii=False))] elif name == "email_list_folders": include_system = arguments.get("include_system", False) reader = get_outlook_reader() folders = reader.list_folders(include_system=include_system) return [TextContent(type="text", text=json.dumps({ "success": True, "folder_count": len(folders), "folders": folders }, indent=2, ensure_ascii=False))] elif name == "email_connection_status": reader = get_outlook_reader() status = reader.get_connection_status() return [TextContent(type="text", text=json.dumps({ "success": True, "status": status }, indent=2))] else: return [TextContent(type="text", text=json.dumps({ "success": False, "error": f"Unknown tool: {name}" }, indent=2))] except Exception as e: return [TextContent(type="text", text=json.dumps({ "success": False, "error": str(e) }, indent=2))] async def main(): """Run the server""" async with stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, app.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dongwoosuk/outlook-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server