Skip to main content
Glama
leeguooooo
by leeguooooo
uid_search.py11.8 kB
""" IMAP UID-based search implementation for better performance and stability """ import imaplib import logging from typing import List, Dict, Any, Optional, Tuple from datetime import datetime, timedelta import email import email.message from email.header import decode_header import re logger = logging.getLogger(__name__) class UIDSearchEngine: """Optimized search engine using IMAP UIDs instead of sequence numbers""" def __init__(self, connection: imaplib.IMAP4_SSL): self.connection = connection def search_by_criteria( self, query: str, search_in: str = "all", unread_only: bool = False, date_from: Optional[str] = None, date_to: Optional[str] = None, folder: str = "INBOX", limit: int = 50 ) -> Tuple[List[str], int]: """ Search emails using IMAP UID commands Returns: Tuple of (list of UIDs, total count) """ try: # Select folder status, data = self.connection.select(folder) if status != 'OK': raise Exception(f"Cannot select folder {folder}") # Build search criteria search_criteria = self._build_search_criteria( query, search_in, unread_only, date_from, date_to ) # Use UID SEARCH instead of regular SEARCH logger.info(f"Searching with criteria: {search_criteria}") # Handle UTF-8 search for non-ASCII characters try: if any(ord(c) > 127 for c in str(search_criteria)): # Use UTF-8 charset for non-ASCII try: status, data = self.connection.uid('SEARCH', 'UTF-8', search_criteria) except Exception as e: logger.warning(f"UTF-8 charset search failed: {e}, trying with encoded bytes") # Encode the search criteria as UTF-8 bytes search_bytes = search_criteria.encode('utf-8') status, data = self.connection.uid('SEARCH', None, search_bytes) else: status, data = self.connection.uid('SEARCH', None, search_criteria) except Exception as e: logger.warning(f"UID search failed: {e}, trying fallback") # Fallback to simple ALL search status, data = self.connection.uid('SEARCH', None, 'ALL') if status != 'OK': return [], 0 # Get UIDs uids = data[0].split() if data[0] else [] total_count = len(uids) # Limit results (get most recent) if total_count > limit: uids = uids[-limit:] # Get last N items (most recent) return [uid.decode() for uid in uids], total_count except Exception as e: logger.error(f"UID search failed: {e}") return [], 0 def fetch_emails_by_uids( self, uids: List[str], include_body: bool = False ) -> List[Dict[str, Any]]: """ Fetch emails using UIDs with batch optimization """ if not uids: return [] emails = [] try: # Batch fetch headers for all UIDs uid_string = ','.join(uids) # Determine what to fetch if include_body: fetch_cmd = '(RFC822.HEADER BODY.PEEK[TEXT] FLAGS UID)' else: fetch_cmd = '(RFC822.HEADER FLAGS UID)' # Batch fetch status, data = self.connection.uid('FETCH', uid_string, fetch_cmd) if status != 'OK': return [] # Process fetched data for response_part in data: if isinstance(response_part, tuple): # Parse email data email_data = self._parse_email_data(response_part, include_body) if email_data: emails.append(email_data) # Sort by date (newest first) emails.sort(key=lambda x: x.get('timestamp', 0), reverse=True) return emails except Exception as e: logger.error(f"Failed to fetch emails by UIDs: {e}") return [] def _build_search_criteria( self, query: str, search_in: str, unread_only: bool, date_from: Optional[str], date_to: Optional[str] ) -> str: """Build IMAP search criteria string""" criteria_parts = [] # Date range filter (default to last 30 days for performance) if not date_from and not date_to: # Default to last 30 days date_from = (datetime.now() - timedelta(days=30)).strftime("%d-%b-%Y") if date_from: criteria_parts.append(f'SINCE {self._format_date(date_from)}') if date_to: criteria_parts.append(f'BEFORE {self._format_date(date_to)}') # Unread filter if unread_only: criteria_parts.append('UNSEEN') # Query filter if query: # Handle encoding for non-ASCII characters encoded_query = self._encode_search_query(query) if search_in == "subject": criteria_parts.append(f'SUBJECT "{encoded_query}"') elif search_in == "from": criteria_parts.append(f'FROM "{encoded_query}"') elif search_in == "to": criteria_parts.append(f'TO "{encoded_query}"') elif search_in == "body": criteria_parts.append(f'BODY "{encoded_query}"') else: # all # For non-ASCII text, use simpler TEXT search which is more compatible if any(ord(c) > 127 for c in encoded_query): criteria_parts.append(f'TEXT "{encoded_query}"') else: # Use nested OR for ASCII text only criteria_parts.append( f'OR (OR SUBJECT "{encoded_query}" FROM "{encoded_query}") ' f'(OR TO "{encoded_query}" BODY "{encoded_query}")' ) # Join criteria if criteria_parts: return ' '.join(criteria_parts) else: return 'ALL' def _encode_search_query(self, query: str) -> str: """Encode query for IMAP search, handling CJK characters""" try: # Try ASCII encoding first query.encode('ascii') return query except UnicodeEncodeError: # For non-ASCII characters, return as-is # We'll handle encoding at the search level return query def _format_date(self, date_str: str) -> str: """Format date for IMAP search""" try: # Parse various date formats for fmt in ['%Y-%m-%d', '%d-%m-%Y', '%Y/%m/%d']: try: dt = datetime.strptime(date_str, fmt) return dt.strftime("%d-%b-%Y") except ValueError: continue return date_str except: return date_str def _parse_email_data( self, response_part: Tuple, include_body: bool ) -> Optional[Dict[str, Any]]: """Parse email data from IMAP response""" try: # Extract message parts msg_data = response_part[1] if not isinstance(msg_data, bytes): return None # Parse headers msg = email.message_from_bytes(msg_data) # Extract UID from response uid_match = re.search(br'UID (\d+)', response_part[0]) uid = uid_match.group(1).decode() if uid_match else None # Extract flags flags_match = re.search(br'FLAGS \((.*?)\)', response_part[0]) flags = flags_match.group(1).decode() if flags_match else "" is_read = '\\Seen' in flags is_flagged = '\\Flagged' in flags # Parse headers subject = self._decode_header(msg['Subject'] or '') from_addr = self._decode_header(msg['From'] or '') to_addr = self._decode_header(msg['To'] or '') date_str = msg['Date'] or '' # Parse date try: date_tuple = email.utils.parsedate_tz(date_str) timestamp = email.utils.mktime_tz(date_tuple) if date_tuple else 0 except: timestamp = 0 email_data = { 'uid': uid, 'id': uid, # Add id field for compatibility 'subject': subject, 'from': from_addr, 'to': to_addr, 'date': date_str, 'timestamp': timestamp, 'is_read': is_read, 'unread': not is_read, # Add unread field for compatibility 'is_flagged': is_flagged, 'has_attachments': self._has_attachments(msg) } # Add body if requested if include_body: email_data['preview'] = self._get_email_preview(msg) return email_data except Exception as e: logger.error(f"Failed to parse email data: {e}") return None def _decode_header(self, header_value: str) -> str: """Decode email header handling various encodings""" if not header_value: return "" try: decoded_parts = decode_header(header_value) result = [] for part, encoding in decoded_parts: if isinstance(part, bytes): if encoding: try: result.append(part.decode(encoding)) except: result.append(part.decode('utf-8', errors='ignore')) else: result.append(part.decode('utf-8', errors='ignore')) else: result.append(str(part)) return ' '.join(result) except: return str(header_value) def _has_attachments(self, msg: email.message.Message) -> bool: """Check if email has attachments""" for part in msg.walk(): if part.get_content_disposition() == 'attachment': return True return False def _get_email_preview(self, msg: email.message.Message, length: int = 200) -> str: """Get email body preview""" try: body = "" for part in msg.walk(): if part.get_content_type() == "text/plain": payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or 'utf-8' body = payload.decode(charset, errors='ignore') break # Clean and truncate body = re.sub(r'\s+', ' ', body).strip() if len(body) > length: body = body[:length] + "..." return body except: return ""

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server