Skip to main content
Glama
validation.py14.3 kB
"""Unified validation utilities for Outlook MCP Server. This module consolidates all common validation patterns to eliminate code duplication. """ from typing import Optional, List, Union from .logging_config import get_logger from .config import ( outlook_config, email_format_config, attachment_config, email_metadata_config, batch_config, display_config, performance_config, validation_config ) logger = get_logger(__name__) class ValidationError(Exception): """Custom exception for validation errors.""" pass class OutlookConstants: """Outlook COM constants (deprecated - use config.outlook_config).""" OL_MAIL_ITEM = outlook_config.OL_MAIL_ITEM OL_CONTACT_ITEM = outlook_config.OL_CONTACT_ITEM OL_DISTRIBUTION_LIST_ITEM = outlook_config.OL_DISTRIBUTION_LIST_ITEM OL_JOURNAL_ITEM = outlook_config.OL_JOURNAL_ITEM OL_NOTE_ITEM = outlook_config.OL_NOTE_ITEM OL_POST_ITEM = outlook_config.OL_POST_ITEM OL_TASK_ITEM = outlook_config.OL_TASK_ITEM class BodyFormat: """Email body format constants (deprecated - use config.email_format_config).""" OL_FORMAT_PLAIN = email_format_config.OL_FORMAT_PLAIN OL_FORMAT_HTML = email_format_config.OL_FORMAT_HTML OL_FORMAT_RICH_TEXT = email_format_config.OL_FORMAT_RICH_TEXT class AttachmentType: """Attachment type constants (deprecated - use config.attachment_config).""" BY_VALUE = attachment_config.BY_VALUE BY_REFERENCE = attachment_config.BY_REFERENCE EMBEDDED = attachment_config.EMBEDDING OLE = attachment_config.OLE class Importance: """Email importance constants (deprecated - use config.email_metadata_config).""" LOW = email_metadata_config.IMPORTANCE_LOW NORMAL = email_metadata_config.IMPORTANCE_NORMAL HIGH = email_metadata_config.IMPORTANCE_HIGH class Sensitivity: """Email sensitivity constants (deprecated - use config.email_metadata_config).""" NORMAL = email_metadata_config.SENSITIVITY_NORMAL PERSONAL = email_metadata_config.SENSITIVITY_PERSONAL PRIVATE = email_metadata_config.SENSITIVITY_PRIVATE CONFIDENTIAL = email_metadata_config.SENSITIVITY_CONFIDENTIAL class FlagStatus: """Email flag status constants (deprecated - use config.email_metadata_config).""" UNFLAGGED = email_metadata_config.FLAG_STATUS_UNFLAGGED FLAGGED = email_metadata_config.FLAG_STATUS_FLAGGED COMPLETE = email_metadata_config.FLAG_STATUS_COMPLETE class BatchLimits: """Batch operation limits (deprecated - use config.batch_config).""" OUTLOOK_BCC_LIMIT = batch_config.OUTLOOK_BCC_LIMIT IMAGE_EMBEDDING_SIZE_THRESHOLD = batch_config.IMAGE_EMBEDDING_SIZE_THRESHOLD class CacheThresholds: """Cache performance thresholds (deprecated - use config.performance_config).""" BINARY_SEARCH_THRESHOLD = performance_config.BINARY_SEARCH_THRESHOLD MAX_CACHE_SIZE = performance_config.MAX_CACHE_SIZE class DisplayConstants: """Display formatting constants (deprecated - use config.display_config).""" SEPARATOR_LINE_LENGTH = display_config.SEPARATOR_LINE_LENGTH class BatchProcessing: """Batch processing constants for email operations (deprecated - use config.batch_config).""" DEFAULT_BATCH_SIZE = batch_config.DEFAULT_BATCH_SIZE FAST_MODE_BATCH_SIZE = batch_config.FAST_MODE_BATCH_SIZE FULL_EXTRACTION_BATCH_SIZE = batch_config.FULL_EXTRACTION_BATCH_SIZE def validate_search_term(search_term: str) -> str: """Validate search term parameter. Args: search_term: Search term to validate Returns: Validated search term Raises: ValidationError: If search term is invalid """ if not search_term or not isinstance(search_term, str): raise ValidationError("Search term must be a non-empty string") search_term = search_term.strip() if not search_term: raise ValidationError("Search term must be a non-empty string") return search_term def validate_days_parameter(days: int, min_days: int = 1, max_days: int = 30) -> int: """Validate days parameter. Args: days: Days parameter to validate min_days: Minimum allowed days (default: 1) max_days: Maximum allowed days (default: 30) Returns: Validated days parameter Raises: ValidationError: If days parameter is invalid """ if not isinstance(days, int): raise ValidationError("Days parameter must be an integer") if days < min_days or days > max_days: raise ValidationError(f"Days parameter must be between {min_days} and {max_days}") return days def validate_folder_name(folder_name: Optional[str]) -> Optional[str]: """Validate and normalize folder name. Args: folder_name: Folder name to validate Returns: Normalized folder name or None Raises: ValidationError: If folder name is invalid """ if folder_name is None: return None if not isinstance(folder_name, str): raise ValidationError("Folder name must be a string or None") folder_name = folder_name.strip() if folder_name.lower() in ["null", ""]: return None return folder_name if folder_name else None def validate_email_address(email: str) -> str: """Validate email address format with comprehensive checks. Args: email: Email address to validate Returns: Validated email address (trimmed and lowercased domain) Raises: ValidationError: If email address is invalid """ import re if not email or not isinstance(email, str): raise ValidationError("Email address must be a non-empty string") email = email.strip() if len(email) > validation_config.MAX_EMAIL_LENGTH: raise ValidationError(f"Email address is too long (maximum {validation_config.MAX_EMAIL_LENGTH} characters)") if len(email.split("@")[0]) > validation_config.MAX_EMAIL_LOCAL_PART_LENGTH: raise ValidationError(f"Email local part is too long (maximum {validation_config.MAX_EMAIL_LOCAL_PART_LENGTH} characters)") pattern = r"^[a-zA-Z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-zA-Z0-9!#$%&'*+/=?^_`{|}~-]+)*@(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$" if not re.match(pattern, email): raise ValidationError(f"Invalid email address format: {email}") return email def validate_email_addresses(emails: Union[str, List[str]]) -> List[str]: """Validate one or more email addresses. Args: emails: Single email string or list of email strings Returns: List of validated email addresses Raises: ValidationError: If any email address is invalid """ if emails is None: raise ValidationError("Email addresses cannot be None") if isinstance(emails, str): emails = [emails] if not isinstance(emails, list): raise ValidationError("Email addresses must be a string or list of strings") validated_emails = [] for email in emails: if not isinstance(email, str) or not email.strip(): raise ValidationError("All email addresses must be non-empty strings") validated_emails.append(validate_email_address(email)) return validated_emails def validate_email_number(email_number: int, cache_size: int) -> int: """Validate email number against cache size. Args: email_number: Email number to validate cache_size: Current cache size Returns: Validated email number Raises: ValidationError: If email number is out of range """ if not isinstance(email_number, int): raise ValidationError("Email number must be an integer") if email_number < 1 or email_number > cache_size: raise ValidationError( f"Email number {email_number} is out of range. Available range: 1-{cache_size}" ) return email_number def validate_page_parameter(page: int, total_pages: int) -> int: """Validate page parameter. Args: page: Page number to validate total_pages: Total number of pages available Returns: Validated page number Raises: ValidationError: If page number is invalid """ if not isinstance(page, int): raise ValidationError("Page parameter must be an integer") if page < 1: raise ValidationError("Page parameter must be at least 1") if total_pages > 0 and page > total_pages: raise ValidationError( f"Page {page} is out of range. Total pages: {total_pages}" ) return page def validate_not_empty(value: str, field_name: str = "Field") -> str: """Validate that a string value is not empty or whitespace. Args: value: String value to validate field_name: Name of the field for error messages Returns: Validated string value Raises: ValidationError: If value is empty or whitespace """ if value is None or not isinstance(value, str): raise ValidationError(f"{field_name} must be a non-empty string") value = value.strip() if not value: raise ValidationError(f"{field_name} must not be empty") return value def sanitize_search_term(search_term: str) -> str: """Sanitize search term to prevent DASL injection. Args: search_term: Raw search term from user Returns: Sanitized search term """ if not search_term: return "" return "".join(c for c in search_term if c.isalnum() or c in " .-_@").strip() def normalize_email_address(email: str) -> str: """Normalize email address for comparison. Handles case sensitivity, display name formats, and extra whitespace. Args: email: Email address to normalize Returns: Normalized email address for comparison """ if not email: return "" normalized = email.strip().rstrip(";").strip() if "<" in normalized and ">" in normalized: start = normalized.find("<") end = normalized.find(">") if start < end: normalized = normalized[start + 1 : end] return normalized.lower() def validate_recipients_list( recipients: Optional[Union[str, List[str]]] ) -> Optional[List[str]]: """Validate and normalize recipients list. Args: recipients: Single email string, list of emails, or None Returns: List of validated emails or None Raises: ValidationError: If recipients are invalid """ if recipients is None: return None if isinstance(recipients, str): if not recipients.strip(): return None recipients = [recipients] if not isinstance(recipients, list): raise ValidationError("Recipients must be a string or list of strings") validated_emails = [] for email in recipients: if isinstance(email, str) and email.strip(): validated_emails.append(email.strip()) return validated_emails if validated_emails else None def get_folder_path_safe(folder_name: Optional[str] = None) -> str: """Get safe folder path, defaulting to Inbox if not provided. Args: folder_name: Folder name to use Returns: Safe folder path """ return folder_name if folder_name else "Inbox" def validate_cache_available(cache_size: int) -> None: """Validate that email cache is available and not empty. Args: cache_size: Current cache size Raises: ValidationError: If cache is empty """ if cache_size == 0: raise ValidationError("No emails available - please list emails first.") def execute_cache_loading_operation( operation_func, operation_name: str, validation_func=None, validation_params=None, message_suffix: str = None, **operation_kwargs ) -> dict: """Unified cache loading operation wrapper for tool functions. This function provides a consistent interface for all cache loading operations including list, search, and load tools. It handles validation, error handling, and message formatting uniformly. Args: operation_func: The backend function to execute (e.g., list_recent_emails, search_email_by_subject) operation_name: Name of the operation for logging and error messages validation_func: Optional validation function to call before executing the operation validation_params: Optional dictionary of parameters to pass to validation function message_suffix: Optional suffix to append to success message (e.g., " (max 30 days)") **operation_kwargs: Keyword arguments to pass to the operation function Returns: dict: Response containing success or error message: { "type": "text", "text": "Found X emails in 'Inbox' from last 7 days (max 30 days). Use 'view_email_cache_tool' to view them." } Example: result = execute_cache_loading_operation( operation_func=list_recent_emails, operation_name="list_recent_emails", validation_func=lambda: validate_days_parameter(days), validation_params=None, message_suffix=" (max 30 days)", folder_name=folder_path, days=days ) """ try: if validation_func and validation_params: validation_func(**validation_params) elif validation_func: validation_func() except ValidationError as e: return {"type": "text", "text": f"Validation error: {str(e)}"} logger.info(f"{operation_name} called with {operation_kwargs}") try: emails, message = operation_func(**operation_kwargs) logger.info(f"{operation_name} returned: {len(emails)} emails, message: {message}") if message_suffix: message = message + message_suffix if "view_email_cache_tool" not in message: message = message + ". Use 'view_email_cache_tool' to view them." return {"type": "text", "text": message} except Exception as e: logger.error(f"Error in {operation_name}: {e}") import traceback traceback.print_exc() return {"type": "text", "text": f"Error retrieving emails: {str(e)}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/marlonluo2018/outlook-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server