Skip to main content
Glama

IMAP MCP Server

by non-dirty
"""IMAP client implementation.""" import email import logging from datetime import datetime, timedelta from email.message import Message from typing import Dict, List, Optional, Set, Tuple, Union import imapclient from imapclient.response_types import SearchIds from imap_mcp.config import ImapConfig from imap_mcp.models import Email from imap_mcp.oauth2 import get_access_token, generate_oauth2_string logger = logging.getLogger(__name__) class ImapClient: """IMAP client for interacting with email servers.""" def __init__(self, config: ImapConfig, allowed_folders: Optional[List[str]] = None): """Initialize IMAP client. Args: config: IMAP configuration allowed_folders: List of allowed folders (None means all folders) """ self.config = config self.allowed_folders = set(allowed_folders) if allowed_folders else None self.client = None self.folder_cache: Dict[str, List[str]] = {} self.connected = False def connect(self) -> None: """Connect to IMAP server. Raises: ConnectionError: If connection fails """ try: self.client = imapclient.IMAPClient( self.config.host, port=self.config.port, ssl=self.config.use_ssl, ) # Use OAuth2 for Gmail if configured if self.config.requires_oauth2: logger.info(f"Using OAuth2 authentication for {self.config.host}") # Get fresh access token if not self.config.oauth2: raise ValueError("OAuth2 configuration is required for Gmail") access_token, _ = get_access_token(self.config.oauth2) # Authenticate with XOAUTH2 # Use the oauth_login method which properly formats the XOAUTH2 string self.client.oauth2_login(self.config.username, access_token) else: # Standard password authentication if not self.config.password: raise ValueError("Password is required for authentication") self.client.login(self.config.username, self.config.password) self.connected = True logger.info(f"Connected to IMAP server {self.config.host}") except Exception as e: self.connected = False logger.error(f"Failed to connect to IMAP server: {e}") raise ConnectionError(f"Failed to connect to IMAP server: {e}") def disconnect(self) -> None: """Disconnect from IMAP server.""" if self.client: try: self.client.logout() except Exception as e: logger.warning(f"Error during IMAP logout: {e}") finally: self.client = None self.connected = False logger.info("Disconnected from IMAP server") def ensure_connected(self) -> None: """Ensure connection is established. Raises: ConnectionError: If not connected and connection fails """ if not self.connected or not self.client: self.connect() def list_folders(self, refresh: bool = False) -> List[str]: """List available folders. Args: refresh: Force refresh folder list cache Returns: List of folder names Raises: ConnectionError: If not connected and connection fails """ self.ensure_connected() # Check cache first if not refresh and self.folder_cache: return list(self.folder_cache.keys()) # Get folders from server folders = [] for flags, delimiter, name in self.client.list_folders(): if isinstance(name, bytes): # Convert bytes to string if necessary name = name.decode("utf-8") # Filter folders if allowed_folders is set if self.allowed_folders is not None and name not in self.allowed_folders: continue folders.append(name) self.folder_cache[name] = flags logger.debug(f"Listed {len(folders)} folders") return folders def select_folder(self, folder: str) -> Dict: """Select a folder. Args: folder: Folder name Returns: Folder information Raises: ConnectionError: If not connected and connection fails ValueError: If folder is not allowed """ self.ensure_connected() # Check if folder is allowed if self.allowed_folders is not None and folder not in self.allowed_folders: raise ValueError(f"Folder '{folder}' is not allowed") # Select folder result = self.client.select_folder(folder) logger.debug(f"Selected folder {folder}") return result def search( self, criteria: Union[str, List, Tuple], folder: str = "INBOX", charset: Optional[str] = None, ) -> List[int]: """Search for messages. Args: criteria: Search criteria folder: Folder to search in charset: Character set for search criteria Returns: List of message UIDs Raises: ConnectionError: If not connected and connection fails """ self.ensure_connected() self.select_folder(folder) if isinstance(criteria, str): # Predefined criteria strings criteria_map = { "all": "ALL", "unseen": "UNSEEN", "seen": "SEEN", "answered": "ANSWERED", "unanswered": "UNANSWERED", "deleted": "DELETED", "undeleted": "UNDELETED", "flagged": "FLAGGED", "unflagged": "UNFLAGGED", "recent": "RECENT", "today": ["SINCE", datetime.now().date()], "yesterday": [ "SINCE", (datetime.now() - timedelta(days=1)).date(), "BEFORE", datetime.now().date(), ], "week": ["SINCE", (datetime.now() - timedelta(days=7)).date()], "month": ["SINCE", (datetime.now() - timedelta(days=30)).date()], } if criteria.lower() in criteria_map: criteria = criteria_map[criteria.lower()] results = self.client.search(criteria, charset=charset) logger.debug(f"Search returned {len(results)} results") return list(results) def fetch_email(self, uid: int, folder: str = "INBOX") -> Optional[Email]: """Fetch a single email by UID. Args: uid: Email UID folder: Folder to fetch from Returns: Email object or None if not found Raises: ConnectionError: If not connected and connection fails """ self.ensure_connected() self.select_folder(folder) # Fetch message data result = self.client.fetch([uid], ["BODY.PEEK[]", "FLAGS"]) if not result or uid not in result: logger.warning(f"Message with UID {uid} not found in folder {folder}") return None # Parse message message_data = result[uid] raw_message = message_data[b"BODY[]"] flags = message_data[b"FLAGS"] # Convert flags to strings str_flags = [ f.decode("utf-8") if isinstance(f, bytes) else f for f in flags ] # Parse email message = email.message_from_bytes(raw_message) email_obj = Email.from_message(message, uid=uid, folder=folder) email_obj.flags = str_flags return email_obj def fetch_emails( self, uids: List[int], folder: str = "INBOX", limit: Optional[int] = None, ) -> Dict[int, Email]: """Fetch multiple emails by UIDs. Args: uids: List of email UIDs folder: Folder to fetch from limit: Maximum number of emails to fetch Returns: Dictionary mapping UIDs to Email objects Raises: ConnectionError: If not connected and connection fails """ self.ensure_connected() self.select_folder(folder) # Apply limit if specified if limit is not None and limit < len(uids): uids = uids[:limit] if not uids: return {} # Fetch message data result = self.client.fetch(uids, ["BODY.PEEK[]", "FLAGS"]) emails = {} for uid, message_data in result.items(): raw_message = message_data[b"BODY[]"] flags = message_data[b"FLAGS"] # Convert flags to strings str_flags = [ f.decode("utf-8") if isinstance(f, bytes) else f for f in flags ] # Parse email message = email.message_from_bytes(raw_message) email_obj = Email.from_message(message, uid=uid, folder=folder) email_obj.flags = str_flags emails[uid] = email_obj return emails def mark_email( self, uid: int, folder: str, flag: str, value: bool = True, ) -> bool: """Mark email with flag. Args: uid: Email UID folder: Folder containing the email flag: Flag to set or remove value: True to set, False to remove Returns: True if successful Raises: ConnectionError: If not connected and connection fails """ self.ensure_connected() self.select_folder(folder) try: if value: self.client.add_flags([uid], flag) logger.debug(f"Added flag {flag} to message {uid}") else: self.client.remove_flags([uid], flag) logger.debug(f"Removed flag {flag} from message {uid}") return True except Exception as e: logger.error(f"Failed to mark email: {e}") return False def move_email(self, uid: int, source_folder: str, target_folder: str) -> bool: """Move email to another folder. Args: uid: Email UID source_folder: Source folder target_folder: Target folder Returns: True if successful Raises: ConnectionError: If not connected and connection fails ValueError: If folder is not allowed """ self.ensure_connected() # Check if folders are allowed if self.allowed_folders is not None: if source_folder not in self.allowed_folders: raise ValueError(f"Source folder '{source_folder}' is not allowed") if target_folder not in self.allowed_folders: raise ValueError(f"Target folder '{target_folder}' is not allowed") # Select source folder self.select_folder(source_folder) try: # Move email (copy + delete) self.client.copy([uid], target_folder) self.client.add_flags([uid], r"\Deleted") self.client.expunge() logger.debug(f"Moved message {uid} from {source_folder} to {target_folder}") return True except Exception as e: logger.error(f"Failed to move email: {e}") return False def delete_email(self, uid: int, folder: str) -> bool: """Delete email. Args: uid: Email UID folder: Folder containing the email Returns: True if successful Raises: ConnectionError: If not connected and connection fails """ self.ensure_connected() self.select_folder(folder) try: self.client.add_flags([uid], r"\Deleted") self.client.expunge() logger.debug(f"Deleted message {uid} from {folder}") return True except Exception as e: logger.error(f"Failed to delete email: {e}") return False

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/non-dirty/imap-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server