Skip to main content
Glama
search_common.py23.4 kB
""" Common utilities for email search operations. This module contains shared functions and utilities used across different email search implementations. """ # Standard library imports from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional # Local application imports from ..logging_config import get_logger from ..validation import BatchProcessing logger = get_logger(__name__) def get_folder_path_safe(folder_name: Optional[str] = None) -> str: """Get safe folder path, defaulting to Inbox if not provided.""" return folder_name if folder_name else "Inbox" def get_date_limit(days: int) -> datetime: """Get the date limit for searching emails.""" from datetime import timezone return datetime.now(timezone.utc) - timedelta(days=days) def is_server_search_supported(search_type: str) -> bool: """Check if server-side search is supported for the given search type.""" return search_type in ["subject", "sender", "recipient"] # COM attribute cache to avoid repeated access - OPTIMIZED VERSION _com_attribute_cache = {} def _get_cached_com_attribute(item, attr_name, default=None): """Get COM attribute with caching to avoid repeated access.""" try: item_id = getattr(item, 'EntryID', '') if not item_id: return getattr(item, attr_name, default) cache_key = f"{item_id}:{attr_name}" if cache_key not in _com_attribute_cache: _com_attribute_cache[cache_key] = getattr(item, attr_name, default) return _com_attribute_cache[cache_key] except Exception: return default def clear_com_attribute_cache(): """Clear the COM attribute cache to prevent memory growth.""" global _com_attribute_cache _com_attribute_cache.clear() logger.debug("Cleared COM attribute cache") def extract_email_info_minimal(item) -> Dict[str, Any]: """Extract minimal email information for fast list operations.""" try: # Ultra-fast extraction with minimal COM access entry_id = getattr(item, 'EntryID', '') subject = getattr(item, 'Subject', 'No Subject') sender = getattr(item, 'SenderName', 'Unknown') received_time = getattr(item, 'ReceivedTime', None) # Extract To recipients - minimal version to_recipients = [] try: recipients = getattr(item, 'Recipients', None) if recipients: for recipient in recipients: if getattr(recipient, 'Type', 0) == 1: # 1 = To recipient recipient_info = { "address": getattr(recipient, 'Address', ''), "name": getattr(recipient, 'Name', '') } if recipient_info["address"] or recipient_info["name"]: to_recipients.append(recipient_info) except Exception as e: logger.debug(f"Error extracting To recipients in minimal mode: {e}") # Fallback to To field try: to_field = getattr(item, 'To', '') if to_field: to_list = str(to_field).split(';') for to_addr in to_list: to_addr = to_addr.strip() if to_addr: to_recipients.append({"address": to_addr, "name": to_addr}) except Exception: pass # Extract CC recipients - minimal version cc_recipients = [] try: recipients = getattr(item, 'Recipients', None) if recipients: for recipient in recipients: if getattr(recipient, 'Type', 0) == 2: # 2 = CC recipient recipient_info = { "address": getattr(recipient, 'Address', ''), "name": getattr(recipient, 'Name', '') } if recipient_info["address"] or recipient_info["name"]: cc_recipients.append(recipient_info) except Exception as e: logger.debug(f"Error extracting CC recipients in minimal mode: {e}") # Fallback to CC field try: cc_field = getattr(item, 'CC', '') if cc_field: cc_list = str(cc_field).split(';') for cc_addr in cc_list: cc_addr = cc_addr.strip() if cc_addr: cc_recipients.append({"address": cc_addr, "name": cc_addr}) except Exception: pass # Extract attachment information with embedded image detection has_attachments = False attachments_list = [] embedded_images_count = 0 try: attachments = getattr(item, 'Attachments', None) if attachments and hasattr(attachments, 'Count') and attachments.Count > 0: for i in range(attachments.Count): attachment = attachments.Item(i + 1) file_name = getattr(attachment, 'FileName', '') or getattr(attachment, 'DisplayName', 'Unknown') # Check if it's an embedded image is_embedded = False # Method 1: Check Content-ID property try: if hasattr(attachment, 'PropertyAccessor'): content_id = attachment.PropertyAccessor.GetProperty("http://schemas.microsoft.com/mapi/proptag/0x3712001F") is_embedded = content_id is not None and len(str(content_id).strip()) > 0 # Also check for Content-Location property if not is_embedded: try: content_location = attachment.PropertyAccessor.GetProperty("http://schemas.microsoft.com/mapi/proptag/0x3713001F") is_embedded = content_location is not None and len(str(content_location).strip()) > 0 except: pass except: pass # Method 2: Check attachment type attachment_type = getattr(attachment, 'Type', 1) if attachment_type in [3, 4]: # 3 = Embedded, 4 = OLE is_embedded = True # Method 3: Check for embedded image naming patterns is_image = file_name.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg', '.ico')) if is_image and not is_embedded: lower_name = file_name.lower() if any(pattern in lower_name for pattern in ['image', 'img', 'cid:', 'embedded']): is_embedded = True elif '.' in lower_name: name_without_ext = lower_name.rsplit('.', 1)[0] if name_without_ext.isdigit() or (len(name_without_ext) <= 2 and name_without_ext.isalnum()): is_embedded = True # Method 4: Check attachment size if is_image and not is_embedded: try: attachment_size = getattr(attachment, 'Size', 0) if attachment_size > 0 and attachment_size < 10000: # Less than 10KB is_embedded = True except: pass # Document files are always real attachments is_document = file_name.lower().endswith(('.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt', '.zip', '.rar')) if is_document: is_embedded = False # Count embedded images if is_embedded and is_image: embedded_images_count += 1 # Only add non-embedded attachments to the list elif not is_embedded: attachment_info = { "name": file_name, "size": getattr(attachment, 'Size', 0), "type": getattr(attachment, 'Type', 1) } attachments_list.append(attachment_info) # Update has_attachments flag based on real attachments only has_attachments = len(attachments_list) > 0 except Exception as e: logger.debug(f"Error extracting attachments in minimal mode: {e}") has_attachments = False attachments_list = [] return { "entry_id": entry_id, "subject": subject, "sender": sender, "received_time": str(received_time) if received_time else "Unknown", "to_recipients": to_recipients, "cc_recipients": cc_recipients, "has_attachments": has_attachments, "attachments": attachments_list, "attachments_count": len(attachments_list), "embedded_images_count": embedded_images_count, "attachments_processed": True } except Exception as e: logger.debug(f"Error in minimal extraction: {e}") return { "entry_id": getattr(item, 'EntryID', ''), "subject": "No Subject", "sender": "Unknown", "received_time": "Unknown", "to_recipients": [], "cc_recipients": [], "has_attachments": False, "attachments": [], "attachments_count": 0, "embedded_images_count": 0, "attachments_processed": True } def extract_email_info(item) -> Dict[str, Any]: """Extract basic email information from an Outlook item with optimized COM access.""" # OPTIMIZATION: Bulk extract all basic attributes in single COM access try: # Extract all basic attributes at once to minimize COM calls entry_id = getattr(item, 'EntryID', '') subject = getattr(item, 'Subject', 'No Subject') sender = getattr(item, 'SenderName', 'Unknown') received_time = getattr(item, 'ReceivedTime', None) email_info = { "entry_id": entry_id, "subject": subject, "sender": sender, "received_time": str(received_time) if received_time else "Unknown" } # Cache these attributes for recipient processing if entry_id: _com_attribute_cache[f"{entry_id}:EntryID"] = entry_id _com_attribute_cache[f"{entry_id}:Subject"] = subject _com_attribute_cache[f"{entry_id}:SenderName"] = sender _com_attribute_cache[f"{entry_id}:ReceivedTime"] = received_time except Exception as e: logger.debug(f"Error extracting basic email info: {e}") # Fallback to safe defaults email_info = { "entry_id": getattr(item, 'EntryID', ''), "subject": "No Subject", "sender": "Unknown", "received_time": "Unknown" } # Extract To recipients - optimized with single-pass COM access try: to_recipients = [] # Use cached recipients collection access recipients = _get_cached_com_attribute(item, 'Recipients') if recipients: try: for recipient in recipients: if _get_cached_com_attribute(recipient, 'Type') == 1: # 1 = To recipient recipient_info = { "address": _get_cached_com_attribute(recipient, 'Address', ''), "name": _get_cached_com_attribute(recipient, 'Name', '') } if recipient_info["address"] or recipient_info["name"]: to_recipients.append(recipient_info) except Exception as e: logger.debug(f"Error extracting from Recipients collection: {e}") # Fallback to To field if Recipients collection didn't work if not to_recipients: to_field = _get_cached_com_attribute(item, 'To') if to_field: try: # Parse To field which might be a semicolon-separated string to_list = str(to_field).split(';') for to_addr in to_list: to_addr = to_addr.strip() if to_addr: to_recipients.append({"address": to_addr, "name": to_addr}) except Exception as e: logger.debug(f"Error extracting from To field: {e}") email_info["to_recipients"] = to_recipients except Exception as e: logger.debug(f"Error in To recipient extraction: {e}") email_info["to_recipients"] = [] # Extract CC recipients - optimized with single-pass COM access try: cc_recipients = [] # Use cached recipients collection access recipients = _get_cached_com_attribute(item, 'Recipients') if recipients: try: for recipient in recipients: if _get_cached_com_attribute(recipient, 'Type') == 2: # 2 = CC recipient recipient_info = { "address": _get_cached_com_attribute(recipient, 'Address', ''), "name": _get_cached_com_attribute(recipient, 'Name', '') } if recipient_info["address"] or recipient_info["name"]: cc_recipients.append(recipient_info) except Exception as e: logger.debug(f"Error extracting CC from Recipients collection: {e}") # Fallback to CC field if Recipients collection didn't work if not cc_recipients: cc_field = _get_cached_com_attribute(item, 'CC') if cc_field: try: # Parse CC field which might be a semicolon-separated string cc_list = str(cc_field).split(';') for cc_addr in cc_list: cc_addr = cc_addr.strip() if cc_addr: cc_recipients.append({"address": cc_addr, "name": cc_addr}) except Exception as e: logger.debug(f"Error extracting from CC field: {e}") email_info["cc_recipients"] = cc_recipients except Exception as e: logger.debug(f"Error in CC recipient extraction: {e}") email_info["cc_recipients"] = [] # Extract additional useful information with optimized COM access try: email_info["unread"] = _get_cached_com_attribute(item, 'UnRead', False) attachments = _get_cached_com_attribute(item, 'Attachments') has_attachments = attachments and hasattr(attachments, 'Count') and attachments.Count > 0 email_info["has_attachments"] = has_attachments # Extract attachment information if present if has_attachments: attachments_list = [] try: for i in range(attachments.Count): attachment = attachments.Item(i + 1) file_name = _get_cached_com_attribute(attachment, 'FileName') or _get_cached_com_attribute(attachment, 'DisplayName', 'Unknown') # Check if it's an embedded image is_embedded = False # Method 1: Check Content-ID property (most reliable for embedded images) try: property_accessor = _get_cached_com_attribute(attachment, 'PropertyAccessor') if property_accessor: content_id = property_accessor.GetProperty("http://schemas.microsoft.com/mapi/proptag/0x3712001F") is_embedded = content_id is not None and len(str(content_id).strip()) > 0 # Also check for Content-Location property if not is_embedded: try: content_location = property_accessor.GetProperty("http://schemas.microsoft.com/mapi/proptag/0x3713001F") is_embedded = content_location is not None and len(str(content_location).strip()) > 0 except: pass except: pass # Method 2: Check attachment type attachment_type = _get_cached_com_attribute(attachment, 'Type', 1) if attachment_type in [3, 4]: # 3 = Embedded, 4 = OLE is_embedded = True # Method 3: Check for embedded image naming patterns is_image = file_name.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg', '.ico')) if is_image and not is_embedded: lower_name = file_name.lower() if any(pattern in lower_name for pattern in ['image', 'img', 'cid:', 'embedded']): is_embedded = True elif '.' in lower_name: name_without_ext = lower_name.rsplit('.', 1)[0] if name_without_ext.isdigit() or (len(name_without_ext) <= 2 and name_without_ext.isalnum()): is_embedded = True # Method 4: Check attachment size if is_image and not is_embedded: try: attachment_size = _get_cached_com_attribute(attachment, 'Size', 0) if attachment_size > 0 and attachment_size < 10000: # Less than 10KB is_embedded = True except: pass # PDF files and documents are always real attachments is_document = file_name.lower().endswith(('.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt', '.zip', '.rar')) if is_document: is_embedded = False # Only add non-embedded attachments to the list if not is_embedded: attachment_info = { "name": file_name, "size": _get_cached_com_attribute(attachment, 'Size', 0), "type": _get_cached_com_attribute(attachment, 'Type', 1) # 1 = ByValue, 2 = ByReference, 3 = Embedded, 4 = OLE } attachments_list.append(attachment_info) # Update has_attachments flag based on real attachments only email_info["has_attachments"] = len(attachments_list) > 0 email_info["attachments"] = attachments_list email_info["attachments_count"] = len(attachments_list) except Exception as e: logger.debug(f"Error extracting attachment details: {e}") email_info["attachments"] = [] email_info["has_attachments"] = False email_info["attachments_count"] = 0 else: email_info["attachments"] = [] email_info["has_attachments"] = False except Exception as e: logger.debug(f"Error extracting email metadata: {e}") email_info["unread"] = False email_info["has_attachments"] = False email_info["attachments"] = [] return email_info def unified_cache_load_workflow(emails_data: List[Dict[str, Any]], operation_name: str = "cache_operation") -> bool: """ Optimized unified cache loading workflow for all email tools. Implements the improved 3-step cache workflow with performance optimizations: 1. Clear both memory and disk cache 2. Load fresh data into memory (optimized batch processing) 3. Save to disk (optimized for small datasets) Args: emails_data: List of email dictionaries to load into cache operation_name: Name of the operation for logging purposes Returns: bool: True if cache loading was successful, False otherwise """ try: from ..shared import clear_email_cache, add_email_to_cache, immediate_save_cache # Minimal logging for performance if len(emails_data) > 100: logger.info(f"Starting cache workflow for {operation_name} with {len(emails_data)} emails") # Step 1: Clear both memory and disk cache for fresh start clear_email_cache() # Step 2: Load fresh data into memory with batch optimization emails_loaded = 0 # For small datasets, process directly without batching overhead if len(emails_data) <= 100: for email_data in emails_data: try: entry_id = email_data.get("entry_id") if entry_id: add_email_to_cache(entry_id, email_data) emails_loaded += 1 except Exception: continue else: # For larger datasets, use batch processing batch_size = BatchProcessing.DEFAULT_BATCH_SIZE for i in range(0, len(emails_data), batch_size): batch = emails_data[i:i + batch_size] for email_data in batch: try: entry_id = email_data.get("entry_id") if entry_id: add_email_to_cache(entry_id, email_data) emails_loaded += 1 except Exception: continue # Step 3: Save to disk with optimization for small datasets if emails_loaded > 0: # For small datasets (< 50 emails), delay disk save or skip if possible if len(emails_data) >= 50: immediate_save_cache() # For very small datasets, disk save is fast enough to proceed normally elif len(emails_data) > 0: immediate_save_cache() return emails_loaded > 0 except Exception as e: logger.error(f"Failed to execute unified cache loading workflow for {operation_name}: {e}") return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/marlonluo2018/outlook-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server