Skip to main content
Glama
email_data_extractor.py14.9 kB
"""Simplified email data extraction with single comprehensive mode.""" # Type imports from typing import Any, Dict, Optional # Local application imports from .email_utils import _format_recipient_for_display from .logging_config import get_logger from .outlook_session.session_manager import OutlookSessionManager from .shared import email_cache, email_cache_order from .utils import OutlookItemClass, safe_encode_text from .validation import ( AttachmentType, BatchLimits, BodyFormat, FlagStatus, Importance, Sensitivity ) logger = get_logger(__name__) def extract_comprehensive_email_data(email: Dict[str, Any]) -> Dict[str, Any]: """Extract comprehensive email data with single mode - always return full text content.""" # Start with basic email data sender = email.get("sender", "Unknown Sender") if isinstance(sender, dict): sender_name = sender.get("name", "Unknown Sender") else: sender_name = str(sender) result = { "id": email.get("id", email.get("entry_id", "")), "entry_id": email.get("id", email.get("entry_id", "")), "subject": email.get("subject", "No Subject"), "sender": sender_name, "from": sender_name, # Alias for compatibility "received_time": email.get("received_time", ""), "received": email.get("received_time", ""), # Alias for compatibility "unread": email.get("unread", False), "has_attachments": email.get("has_attachments", False), "size": email.get("size", 0), "to": ( ", ".join([_format_recipient_for_display(r) for r in email.get("to_recipients", [])]) if email.get("to_recipients") else "" ), "cc": ( ", ".join([_format_recipient_for_display(r) for r in email.get("cc_recipients", [])]) if email.get("cc_recipients") else "" ), "body": email.get("body", ""), # Include cached body if available "attachments": email.get("attachments", []), # Include cached attachments if available "attachments_count": len(email.get("attachments", [])), # Count of real attachments } # Always attempt to get comprehensive content from Outlook try: with OutlookSessionManager() as session: if not session or not session.namespace: logger.error("Failed to establish Outlook session") return result if not hasattr(session.namespace, 'GetItemFromID'): logger.error("Namespace does not have GetItemFromID method") return result item = session.namespace.GetItemFromID(email.get("entry_id", email.get("id", ""))) if not item or item.Class != OutlookItemClass.MAIL_ITEM: logger.warning(f"Email not found or not a mail item") return result # Extract all available text content result["body"] = safe_encode_text(getattr(item, "Body", ""), "body") result["html_body"] = safe_encode_text(getattr(item, "HTMLBody", ""), "html_body") if hasattr(item, "HTMLBody") else "" result["body_format"] = getattr(item, "BodyFormat", 1) # 1=Plain, 2=HTML, 3=RichText # Extract attachment details if not already cached if hasattr(item, 'Attachments') and item.Attachments and item.Attachments.Count > 0: attachments = [] try: for i in range(item.Attachments.Count): attachment = item.Attachments.Item(i + 1) file_name = getattr(attachment, 'FileName', getattr(attachment, 'DisplayName', 'Unknown')) # Check if it's an embedded image is_embedded = False # Method 1: Check Content-ID property (most reliable for embedded images) try: if hasattr(attachment, 'PropertyAccessor'): content_id = attachment.PropertyAccessor.GetProperty("http://schemas.microsoft.com/mapi/proptag/0x3712001F") is_embedded = content_id is not None and len(str(content_id).strip()) > 0 # Also check for Content-Location property (another indicator of embedded content) if not is_embedded: try: content_location = attachment.PropertyAccessor.GetProperty("http://schemas.microsoft.com/mapi/proptag/0x3713001F") is_embedded = content_location is not None and len(str(content_location).strip()) > 0 except: pass except: pass # Method 2: Check attachment type - embedded attachments are usually Type 4 (OLE) or Type 3 (Embedded) attachment_type = getattr(attachment, 'Type', AttachmentType.BY_VALUE) if attachment_type in [AttachmentType.EMBEDDED, AttachmentType.OLE]: is_embedded = True # Method 3: Check if it's an image with suspicious naming patterns is_image = file_name.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg', '.ico')) if is_image and not is_embedded: # Check for common embedded image naming patterns lower_name = file_name.lower() if any(pattern in lower_name for pattern in ['image', 'img', 'cid:', 'embedded']): is_embedded = True # Check if filename is just numbers with extension (common for embedded images) elif '.' in lower_name: name_without_ext = lower_name.rsplit('.', 1)[0] if name_without_ext.isdigit() or (len(name_without_ext) <= 2 and name_without_ext.isalnum()): is_embedded = True # Method 4: Check if attachment size is suspiciously small for an image (embedded images are often smaller) if is_image and not is_embedded: try: attachment_size = getattr(attachment, 'Size', 0) # Embedded images are typically smaller than regular image attachments if attachment_size > 0 and attachment_size < 50000: # Less than 50KB # Additional check: if it's a very small image, more likely to be embedded if attachment_size < 10000: # Less than 10KB is_embedded = True except: pass # PDF files and other documents are always considered real attachments is_document = file_name.lower().endswith(('.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt', '.zip', '.rar')) if is_document: is_embedded = False # Only add non-embedded attachments to the list if not is_embedded: attachment_info = { "name": file_name, "size": getattr(attachment, 'Size', 0), "type": getattr(attachment, 'Type', AttachmentType.BY_VALUE) } attachments.append(attachment_info) # Update has_attachments flag and attachments list result["attachments"] = attachments result["has_attachments"] = len(attachments) > 0 result["attachments_count"] = len(attachments) except Exception as e: logger.debug(f"Error extracting attachment details: {e}") result["attachments"] = [] result["has_attachments"] = False result["attachments_count"] = 0 # Enhanced metadata result["importance"] = getattr(item, "Importance", 1) # 0=Low, 1=Normal, 2=High result["sensitivity"] = getattr(item, "Sensitivity", 0) # 0=Normal, 1=Personal, 2=Private, 3=Confidential result["conversation_topic"] = safe_encode_text(getattr(item, "ConversationTopic", ""), "conversation_topic") result["conversation_id"] = getattr(item, "ConversationID", "") result["categories"] = getattr(item, "Categories", "") result["flag_status"] = getattr(item, "FlagStatus", 0) # 0=Unflagged, 1=Flagged, 2=Complete except Exception as e: logger.error(f"Error loading email details: {e}") # Return basic data on error pass return result def extract_basic_email_data(email: Dict[str, Any]) -> Dict[str, Any]: """Extract email data with full text but without embedded images and attachments (renamed from text_only).""" # Start with comprehensive data but filter out attachments and embedded images comprehensive_data = extract_comprehensive_email_data(email) # Remove attachments and embedded images comprehensive_data["attachments"] = [] comprehensive_data["has_attachments"] = False # Keep all text content but ensure no embedded images in HTML body if comprehensive_data.get("html_body"): # Simple regex to remove img tags (basic HTML cleaning) import re comprehensive_data["html_body"] = re.sub(r'<img[^>]*>', '', comprehensive_data["html_body"]) return comprehensive_data # Removed text_only function and get_conversation_thread function - now basic mode uses the text-only logic def create_basic_email_response(email: Dict[str, Any]) -> Dict[str, Any]: """Create basic email response from cached data only.""" sender = email.get("sender", "Unknown Sender") if isinstance(sender, dict): sender_name = sender.get("name", "Unknown Sender") else: sender_name = str(sender) return { "id": email.get("id", ""), "subject": email.get("subject", "No Subject"), "sender": sender_name, "received_time": email.get("received_time", ""), "unread": email.get("unread", False), "has_attachments": email.get("has_attachments", False), "size": email.get("size", 0), "body": email.get("body", ""), "to": ( ", ".join([_format_recipient_for_display(r) for r in email.get("to_recipients", [])]) if email.get("to_recipients") else "" ), "cc": ( ", ".join([_format_recipient_for_display(r) for r in email.get("cc_recipients", [])]) if email.get("cc_recipients") else "" ), "attachments": email.get("attachments", []), } def get_email_by_number_unified(email_number: int, mode: str = "basic", include_attachments: bool = True, embed_images: bool = True) -> Optional[Dict[str, Any]]: """Get email by number from cache with unified interface. Args: email_number: The number of the email in the cache (1-based) mode: Retrieval mode - "basic", "enhanced", "lazy" include_attachments: Whether to include attachment content embed_images: Whether to embed inline images Returns: Email data dictionary or None if not found """ if not isinstance(email_number, int) or email_number < 1: return None # Check if cache is loaded if not email_cache or not email_cache_order: return None # Validate email number if email_number > len(email_cache_order): return None # Get email ID from cache order email_id = email_cache_order[email_number - 1] # Get email from cache email_data = email_cache.get(email_id) if not email_data: return None # Extract email data based on mode if mode == "basic": return extract_basic_email_data(email_data) # This is the new text-only mode (renamed) else: # enhanced, lazy, or any other mode return extract_comprehensive_email_data(email_data) def format_email_with_media(email_data: Dict[str, Any]) -> str: """Format email with media information for enhanced display.""" formatted_text = f"Subject: {email_data.get('subject', 'N/A')}\n" formatted_text += f"From: {email_data.get('from', 'N/A')}\n" formatted_text += f"To: {email_data.get('to', 'N/A')}\n" formatted_text += f"Date: {email_data.get('received', 'N/A')}\n" # Add conversation topic if available if email_data.get("conversation_topic"): formatted_text += f"Conversation: {email_data.get('conversation_topic', 'N/A')}\n" formatted_text += f"Body: {email_data.get('body', 'N/A')}\n" # Add HTML body if available and different from plain body if email_data.get("html_body") and email_data.get("html_body") != email_data.get("body"): formatted_text += f"HTML Body: {email_data.get('html_body', 'N/A')}\n" # Add attachments if present and mode allows it if email_data.get("attachments") and email_data.get("has_attachments", False): formatted_text += f"\nAttachments: {len(email_data['attachments'])}\n" for attachment in email_data["attachments"]: formatted_text += f" - {attachment.get('name', 'Unknown')}" if attachment.get('size'): formatted_text += f" ({attachment['size']} bytes)" if attachment.get('content_base64'): content_length = len(attachment['content_base64']) formatted_text += f" [Base64 content: {content_length} characters]" formatted_text += "\n" # Add metadata if available if email_data.get("importance") is not None: importance_map = {0: "Low", 1: "Normal", 2: "High"} formatted_text += f"Importance: {importance_map.get(email_data.get('importance'), 'Normal')}\n" if email_data.get("categories"): formatted_text += f"Categories: {email_data.get('categories', 'N/A')}\n" return formatted_text

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/marlonluo2018/outlook-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server