Skip to main content
Glama
folder_operations.py33.4 kB
""" Folder operations for Outlook session management. This module provides folder-related operations such as creation, deletion, moving, and retrieval. """ # Standard library imports import time from datetime import datetime from functools import lru_cache from typing import Any, Dict, List, Optional, Tuple # Local application imports from ..logging_config import get_logger from ..utils import OutlookFolderType, retry_on_com_error from ..validation import BatchProcessing from .exceptions import FolderNotFoundError, InvalidParameterError, OperationFailedError logger = get_logger(__name__) class FolderOperations: """Handles all folder-related operations for Outlook.""" def __init__(self, session_manager): """Initialize with a session manager instance.""" self.session_manager = session_manager self._folder_cache = {} self._cache_timestamp = 0 self._cache_ttl = 300 # 5 minutes TTL for folder cache def _is_cache_valid(self): """Check if folder cache is still valid.""" return time.time() - self._cache_timestamp < self._cache_ttl def _get_cached_folder(self, folder_name: str): """Get folder from cache if available and valid.""" if self._is_cache_valid() and folder_name in self._folder_cache: logger.debug(f"Folder cache hit for: {folder_name}") return self._folder_cache[folder_name] return None def _cache_folder(self, folder_name: str, folder): """Cache folder for future use.""" self._folder_cache[folder_name] = folder self._cache_timestamp = time.time() logger.debug(f"Folder cached: {folder_name}") def clear_folder_cache(self): """Clear the folder cache.""" self._folder_cache.clear() self._cache_timestamp = 0 logger.info("Folder cache cleared") def get_folder(self, folder_name: Optional[str] = None): """Get specified folder or default inbox with caching.""" # Normalize folder name for caching cache_key = folder_name.lower() if folder_name else "inbox" # Try cache first cached_folder = self._get_cached_folder(cache_key) if cached_folder: return cached_folder # Get folder and cache it folder = self._get_folder_internal(folder_name) if folder: self._cache_folder(cache_key, folder) return folder def _get_folder_internal(self, folder_name: Optional[str] = None): """Internal method to get folder without caching.""" # Handle string "null" as well as actual None if not folder_name or folder_name == "null" or folder_name.lower() == "inbox": folder = self.session_manager.outlook_namespace.GetDefaultFolder(OutlookFolderType.INBOX) return folder elif folder_name.lower() == "sent items" or folder_name.lower() == "sent": folder = self.session_manager.outlook_namespace.GetDefaultFolder(OutlookFolderType.SENT_MAIL) return folder elif folder_name.lower() == "deleted items" or folder_name.lower() == "trash": folder = self.session_manager.outlook_namespace.GetDefaultFolder(OutlookFolderType.DELETED_ITEMS) return folder elif folder_name.lower() == "drafts": folder = self.session_manager.outlook_namespace.GetDefaultFolder(OutlookFolderType.DRAFTS) return folder elif folder_name.lower() == "outbox": folder = self.session_manager.outlook_namespace.GetDefaultFolder(OutlookFolderType.OUTBOX) return folder elif folder_name.lower() == "calendar": folder = self.session_manager.outlook_namespace.GetDefaultFolder(OutlookFolderType.CALENDAR) return folder elif folder_name.lower() == "contacts": folder = self.session_manager.outlook_namespace.GetDefaultFolder(OutlookFolderType.CONTACTS) return folder elif folder_name.lower() == "tasks": folder = self.session_manager.outlook_namespace.GetDefaultFolder(OutlookFolderType.TASKS) return folder else: folder = self._get_folder_by_name(folder_name) return folder def _get_folder_by_name(self, folder_name: str): """Find folder by name in folder hierarchy, supporting nested paths and mailbox-specific paths.""" try: # Handle nested folder paths (e.g., "Parent Folder/Child Folder" or "mailbox@domain.com/Inbox/Folder") if "/" in folder_name or "\\" in folder_name: # Use forward slash as path separator, but also support backslash path_parts = folder_name.replace("\\", "/").split("/") current_folder = None # Check if first part looks like an email address (mailbox-specific path) if "@" in path_parts[0] and "." in path_parts[0]: # This is a mailbox-specific path like "user@company.com/Inbox/Folder" mailbox_name = path_parts[0] # Find the mailbox folder for folder in self.session_manager.outlook_namespace.Folders: if folder.Name == mailbox_name: current_folder = folder break if not current_folder: raise FolderNotFoundError(f"Mailbox '{mailbox_name}' not found") # Navigate through the remaining path parts remaining_parts = path_parts[1:] else: # This is a regular path like "Inbox/Folder" or "Parent Folder/Child Folder" # Start with the top-level folders for folder in self.session_manager.outlook_namespace.Folders: if folder.Name == path_parts[0]: current_folder = folder break if not current_folder: raise FolderNotFoundError(f"Top-level folder '{path_parts[0]}' not found") remaining_parts = path_parts[1:] # Navigate through the remaining path parts - optimized search for part in remaining_parts: found = False # Try direct access first (much faster for well-known folders) try: current_folder = current_folder.Folders[part] found = True except Exception: # Fall back to iteration if direct access fails for subfolder in current_folder.Folders: if subfolder.Name == part: current_folder = subfolder found = True break if not found: raise FolderNotFoundError(f"Folder '{part}' not found in '{current_folder.Name}'") return current_folder else: # Original logic for single folder names - optimized # Try direct access first for common folders try: return self.session_manager.outlook_namespace.Folders[folder_name] except Exception: pass # Fall back to iteration for folder in self.session_manager.outlook_namespace.Folders: if folder.Name == folder_name: return folder # Try direct access for subfolders try: return folder.Folders[folder_name] except Exception: # Fall back to iteration for subfolders for subfolder in folder.Folders: if subfolder.Name == folder_name: return subfolder raise FolderNotFoundError(f"Folder '{folder_name}' not found") except Exception as e: logger.error(f"Error finding folder: {str(e)}") if isinstance(e, FolderNotFoundError): raise raise OperationFailedError(f"Error finding folder '{folder_name}': {str(e)}") @retry_on_com_error(max_attempts=3, initial_delay=1.0) def create_folder(self, folder_name: str, parent_folder_name: Optional[str] = None) -> str: """Create a new folder in the specified parent folder. Args: folder_name: Name of the folder to create parent_folder_name: Name of the parent folder (optional, defaults to Inbox) Returns: Success message """ if not folder_name or not isinstance(folder_name, str): raise InvalidParameterError("Folder name must be a non-empty string") try: parent_folder = self.get_folder(parent_folder_name) # Check if folder already exists for folder in parent_folder.Folders: if folder.Name == folder_name: return f"Folder '{folder_name}' already exists in '{parent_folder.Name}'" # Create the new folder new_folder = parent_folder.Folders.Add(folder_name) logger.info(f"Created folder '{folder_name}' in '{parent_folder.Name}'") return f"Folder '{folder_name}' created successfully in '{parent_folder.Name}'" except Exception as e: error_msg = f"Error creating folder '{folder_name}': {str(e)}" logger.error(error_msg) raise OperationFailedError(error_msg) @retry_on_com_error(max_attempts=3, initial_delay=1.0) def remove_folder(self, folder_name: str) -> str: """Remove an existing folder. Args: folder_name: Name or path of the folder to remove Returns: Success message """ if not folder_name or not isinstance(folder_name, str): raise InvalidParameterError("Folder name must be a non-empty string") try: folder = self._get_folder_by_name(folder_name) # Check if it's a default folder folder_path = folder.FolderPath if hasattr(folder, 'FolderPath') else folder.Name if self._is_default_folder(folder_path): raise OperationFailedError(f"Cannot remove default folder '{folder_name}'") # Get parent folder for the success message parent_folder = folder.Parent folder_name_only = folder.Name # Delete the folder folder.Delete() logger.info(f"Removed folder '{folder_name_only}' from '{parent_folder.Name}'") return f"Folder '{folder_name_only}' removed successfully from '{parent_folder.Name}'" except Exception as e: error_msg = f"Error removing folder '{folder_name}': {str(e)}" logger.error(error_msg) if isinstance(e, (FolderNotFoundError, OperationFailedError)): raise raise OperationFailedError(error_msg) @retry_on_com_error(max_attempts=3, initial_delay=1.0) def move_folder(self, source_folder_path: str, target_parent_path: str) -> str: """Move a folder to a different parent folder. Args: source_folder_path: Path to the source folder target_parent_path: Path to the target parent folder Returns: Success message """ if not source_folder_path or not isinstance(source_folder_path, str): raise InvalidParameterError("Source folder path must be a non-empty string") if not target_parent_path or not isinstance(target_parent_path, str): raise InvalidParameterError("Target parent path must be a non-empty string") try: # Get source folder source_folder = self._get_folder_by_name(source_folder_path) # Get target parent folder target_parent = self._get_folder_by_name(target_parent_path) # Check if it's a default folder (cannot move default folders) source_folder_path_attr = source_folder.FolderPath if hasattr(source_folder, 'FolderPath') else source_folder.Name if self._is_default_folder(source_folder_path_attr): raise OperationFailedError(f"Cannot move default folder '{source_folder_path}'") # Move the folder source_folder.MoveTo(target_parent) logger.info(f"Moved folder '{source_folder.Name}' to '{target_parent.Name}'") return f"Folder '{source_folder.Name}' moved successfully to '{target_parent.Name}'" except Exception as e: error_msg = f"Error moving folder '{source_folder_path}' to '{target_parent_path}': {str(e)}" logger.error(error_msg) if isinstance(e, (FolderNotFoundError, OperationFailedError)): raise raise OperationFailedError(error_msg) def get_folder_list(self): """Get list of all folders.""" try: folders = [] for folder in self.session_manager.outlook_namespace.Folders: folders.append(folder) # Also add subfolders self._add_subfolders(folder, folders) return folders except Exception as e: logger.error(f"Error getting folder list: {str(e)}") raise OperationFailedError(f"Error getting folder list: {str(e)}") def _add_subfolders(self, folder, folders_list): """Recursively add subfolders to the list.""" try: for subfolder in folder.Folders: folders_list.append(subfolder) self._add_subfolders(subfolder, folders_list) except Exception as e: logger.warning(f"Error accessing subfolders of {folder.Name}: {str(e)}") def _is_default_folder(self, folder_path: str) -> bool: """Check if a folder is a default Outlook folder.""" default_folders = [ "Inbox", "Sent Items", "Deleted Items", "Drafts", "Outbox", "Junk Email", "Archive", "Conversation History" ] # Extract folder name from path folder_name = folder_path.split("\\")[-1] if "\\" in folder_path else folder_path return folder_name in default_folders def get_folder_emails(self, folder_name: str = "Inbox", max_emails: int = 100, fast_mode: bool = True, days_filter: int = None) -> Tuple[List[Dict[str, Any]], str]: """ Get emails from a folder with pagination support - optimized for performance. Args: folder_name: Name of the folder to get emails from max_emails: Maximum number of emails to return fast_mode: If True, use minimal extraction for better performance days_filter: Number of days to filter by (None for number-based loading) Returns: Tuple of (list of email dictionaries, status message) """ start_time = time.time() try: # Validate input parameters if not folder_name or not folder_name.strip(): return [], f"Error: Invalid folder name: {folder_name}" if max_emails < 1: return [], f"Error: Invalid max_emails value: {max_emails}" except Exception as e: logger.error(f"Validation error in get_folder_emails: {e}") return [], f"Error: Invalid parameters: {e}" try: # Get folder with caching folder = self.get_folder(folder_name) if not folder: return [], f"Error: Folder '{folder_name}' not found" logger.info(f"Getting emails from folder '{folder_name}' with limit {max_emails}, fast_mode={fast_mode}") # Use server-side filtering with Restrict method for better performance from datetime import datetime, timedelta # Determine optimal filtering strategy based on parameters items = [] filter_time = time.time() if days_filter is None: # Number-based loading: get items without date filtering, but ensure we get newest first logger.info(f"Number-based loading: getting up to {max_emails} items without date filter") # Use a much smaller date range initially for better performance # Start with 7 days, then expand gradually if needed days_to_try = [7, 14, 30, 60, 90] items = [] for days in days_to_try: date_limit = datetime.now() - timedelta(days=days) date_filter = f"@SQL=urn:schemas:httpmail:datereceived >= '{date_limit.strftime('%Y-%m-%d')}'" try: filtered_items = folder.Items.Restrict(date_filter) if filtered_items.Count > 0: # Use a more efficient approach - get only what we need # Instead of converting entire collection to list, iterate efficiently temp_items = [] count = 0 # Use GetLast/GetPrevious for newest-first order (better performance) item = filtered_items.GetLast() while item and count < max_emails * 2: # Get 2x to account for filtering temp_items.append(item) count += 1 item = filtered_items.GetPrevious() items = temp_items logger.info(f"{days}-day filter returned {len(items)} items in {time.time() - filter_time:.2f}s") # If we got enough items, break out of the loop if len(items) >= max_emails: break else: continue # Try next larger time window except Exception as e: logger.warning(f"Restrict method failed for {days} days: {e}") continue # If no items found with date filtering, try reverse indexing as fallback if not items: logger.info("No items found with date filtering, trying reverse indexing fallback") try: # Get items in reverse order (newest first) using a more efficient approach total_count = folder.Items.Count if total_count > 0: # Start from the end (newest items) and work backwards efficiently # Use GetLast/GetPrevious for better performance items = [] item = folder.Items.GetLast() count = 0 while item and count < max_emails * 2: # Get 2x to be safe items.append(item) count += 1 item = folder.Items.GetPrevious() logger.info(f"Retrieved {len(items)} items using GetLast/GetPrevious in {time.time() - filter_time:.2f}s") else: items = [] except Exception as final_e: logger.error(f"All fallback methods failed: {final_e}") items = [] else: # Time-based loading: use date filtering with the specified days_filter value if max_emails <= 50: date_limit = datetime.now() - timedelta(days=days_filter) # Use the actual days_filter value date_filter = f"@SQL=urn:schemas:httpmail:datereceived >= '{date_limit.strftime('%Y-%m-%d')}'" logger.info(f"Date filter for {days_filter} days: {date_filter} (limit: {date_limit})") try: filtered_items = folder.Items.Restrict(date_filter) if filtered_items.Count > 0: # Use efficient iteration instead of list conversion items = [] count = 0 item = filtered_items.GetLast() while item and count < max_emails * 2: items.append(item) count += 1 item = filtered_items.GetPrevious() logger.info(f"{days_filter}-day filter returned {len(items)} items in {time.time() - filter_time:.2f}s") # Respect the days parameter strictly - do not expand date range # Use only the emails found within the specified date range else: items = [] except Exception as e: logger.warning(f"Restrict method failed: {e}, using sorted list approach") # Use sorted list approach to get newest emails first items = [] try: all_items = list(folder.Items) # Sort by received time (newest first) before limiting all_items.sort(key=lambda x: x.ReceivedTime if hasattr(x, 'ReceivedTime') and x.ReceivedTime else datetime.min, reverse=True) items = all_items[:max_emails * 2] # Get 2x to account for filtering except Exception as inner_e: logger.error(f"Sorted list approach failed: {inner_e}") # Final fallback - try GetLast/GetPrevious for newest-first order try: item = folder.Items.GetLast() count = 0 while item and count < max_emails * 2: items.append(item) item = folder.Items.GetPrevious() count += 1 except Exception as final_e: logger.error(f"All fallback methods failed: {final_e}") items = [] else: # For larger requests, use the specified days_filter value date_limit = datetime.now() - timedelta(days=days_filter) date_filter = f"@SQL=urn:schemas:httpmail:datereceived >= '{date_limit.strftime('%Y-%m-%d')}'" try: filtered_items = folder.Items.Restrict(date_filter) if filtered_items.Count > 0: # Use efficient iteration instead of list conversion with newest-first order items = [] count = 0 item = filtered_items.GetLast() while item and count < max_emails * 2: items.append(item) count += 1 item = filtered_items.GetPrevious() logger.info(f"{days_filter}-day filter returned {len(items)} items in {time.time() - filter_time:.2f}s") else: items = [] except Exception as e: logger.warning(f"Restrict method failed: {e}, falling back to sorted list approach") # Use sorted list approach to get newest emails first items = [] try: all_items = list(folder.Items) # Sort by received time (newest first) before limiting all_items.sort(key=lambda x: x.ReceivedTime if hasattr(x, 'ReceivedTime') and x.ReceivedTime else datetime.min, reverse=True) items = all_items[:max_emails * 2] # Get 2x to account for filtering except Exception as inner_e: logger.error(f"Sorted list approach failed: {inner_e}") # Final fallback - try GetLast/GetPrevious for newest-first order try: item = folder.Items.GetLast() count = 0 while item and count < max_emails * 2: items.append(item) item = folder.Items.GetPrevious() count += 1 except Exception as final_e: logger.error(f"All fallback methods failed: {final_e}") items = [] if not items: return [], f"No emails found in '{folder_name}'" # Quick sort by received time (newest first) - only sort what we need sort_time = time.time() try: # Only sort the items we actually need, not the entire collection items.sort(key=lambda x: x.ReceivedTime if hasattr(x, 'ReceivedTime') and x.ReceivedTime else datetime.min, reverse=True) logger.info(f"Sorting completed in {time.time() - sort_time:.2f}s") except Exception as e: logger.warning(f"Error sorting emails: {e}") # Limit the number of emails limited_items = items[:max_emails] # Batch process emails for better performance extraction_time = time.time() email_list = [] # Import extraction functions once, outside the loop if fast_mode: from ..email_search.search_common import extract_email_info_minimal extractor = extract_email_info_minimal else: from ..email_search.search_common import extract_email_info extractor = extract_email_info # Process in batches with progress indication batch_size = BatchProcessing.FAST_MODE_BATCH_SIZE if fast_mode else BatchProcessing.FULL_EXTRACTION_BATCH_SIZE total_items = len(limited_items) for i in range(0, total_items, batch_size): batch = limited_items[i:i + batch_size] batch_start = time.time() for item in batch: try: email_data = extractor(item) if email_data and (fast_mode and email_data.get("entry_id") or not fast_mode): email_list.append(email_data) except Exception as e: logger.warning(f"Failed to process email: {e}") continue # Log progress for large batches if total_items > 100 and (i + batch_size) % 100 == 0: progress = (i + batch_size) / total_items * 100 logger.info(f"Progress: {progress:.1f}% ({i + batch_size}/{total_items} items processed)") logger.info(f"Email extraction completed in {time.time() - extraction_time:.2f}s") if not email_list: return [], f"No valid emails found in '{folder_name}'" # Use unified cache loading workflow for consistent cache management cache_time = time.time() from ..email_search.search_common import unified_cache_load_workflow success = unified_cache_load_workflow(email_list, f"get_folder_emails({folder_name})") if success: logger.info(f"Unified cache workflow completed in {time.time() - cache_time:.2f}s") else: logger.warning("Unified cache workflow failed") total_time = time.time() - start_time message = f"Found {len(email_list)} emails in '{folder_name}' (completed in {total_time:.2f}s)" # Log performance metrics logger.info(f"Performance: Folder='{folder_name}', Emails={len(email_list)}, TotalTime={total_time:.2f}s, " f"FilterTime={filter_time - start_time:.2f}s, SortTime={sort_time - filter_time:.2f}s, " f"ExtractTime={extraction_time - sort_time:.2f}s, CacheTime={cache_time - extraction_time:.2f}s") return email_list, message except Exception as e: error_msg = f"Error getting emails from folder: {e}" logger.error(error_msg) return [], f"Error: {error_msg}" def list_folders(): """Get list of all folder names. Returns: List of folder names """ from ..outlook_session.session_manager import OutlookSessionManager try: with OutlookSessionManager() as session_manager: folder_ops = FolderOperations(session_manager) folders = folder_ops.get_folder_list() return [folder.Name for folder in folders] except Exception as e: logger.error(f"Error getting folder list: {str(e)}") return [] def create_folder(folder_name: str, parent_folder_name: Optional[str] = None) -> str: """Create a new folder in the specified parent folder. Args: folder_name: Name of the folder to create parent_folder_name: Name of the parent folder (optional, defaults to Inbox) Returns: Success message """ from ..outlook_session.session_manager import OutlookSessionManager try: with OutlookSessionManager() as session_manager: folder_ops = FolderOperations(session_manager) return folder_ops.create_folder(folder_name, parent_folder_name) except Exception as e: logger.error(f"Error creating folder: {str(e)}") return f"Error: {str(e)}" def remove_folder(folder_name: str) -> str: """Remove an existing folder. Args: folder_name: Name or path of the folder to remove Returns: Success message """ from ..outlook_session.session_manager import OutlookSessionManager try: with OutlookSessionManager() as session_manager: folder_ops = FolderOperations(session_manager) return folder_ops.remove_folder(folder_name) except Exception as e: logger.error(f"Error removing folder: {str(e)}") return f"Error: {str(e)}" def move_folder(source_folder_path: str, target_parent_path: str) -> str: """Move a folder to a different parent folder. Args: source_folder_path: Path to the source folder target_parent_path: Path to the target parent folder Returns: Success message """ from ..outlook_session.session_manager import OutlookSessionManager try: with OutlookSessionManager() as session_manager: folder_ops = FolderOperations(session_manager) return folder_ops.move_folder(source_folder_path, target_parent_path) except Exception as e: logger.error(f"Error moving folder: {str(e)}") return f"Error: {str(e)}" def get_folder_emails(folder_name: str = "Inbox", max_emails: int = 100, days_filter: int = None) -> Tuple[List[Dict[str, Any]], str]: """Get emails from a folder with pagination support. Args: folder_name: Name of the folder to get emails from max_emails: Maximum number of emails to return days_filter: Number of days to filter by (None for number-based loading) Returns: Tuple of (list of email dictionaries, status message) """ from ..outlook_session.session_manager import OutlookSessionManager try: with OutlookSessionManager() as session_manager: folder_ops = FolderOperations(session_manager) return folder_ops.get_folder_emails(folder_name, max_emails, True, days_filter) except Exception as e: logger.error(f"Error getting folder emails: {str(e)}") return [], f"Error: {str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/marlonluo2018/outlook-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server