Email Processing MCP Server

by Cam10001110101
Verified
  • src
import win32com.client from datetime import datetime import pytz import pywintypes import re from EmailMetadata import EmailMetadata class OutlookConnector: def __init__(self, process_deleted_items=False): """ Initialize the Outlook connector. Args: process_deleted_items (bool): Whether to process emails from the Deleted Items folder """ self.process_deleted_items = process_deleted_items try: self.app = win32com.client.Dispatch("Outlook.Application") self.outlook = self.app.GetNamespace("MAPI") self.current_user = self.app.Session.CurrentUser except Exception as e: self.outlook = None self.app = None self.current_user = None def get_mailboxes(self): mailboxes = [] if self.outlook: try: for account in self.app.Session.Accounts: mailboxes.append(account) except Exception: pass return mailboxes def get_mailbox(self, mailbox_name): if self.outlook: try: for account in self.app.Session.Accounts: if account.DisplayName.lower() == mailbox_name.lower(): return account except Exception: pass return None @staticmethod def clean_email_body(body): """Clean email body by removing problematic content and escaping for JSON.""" if not body: return "" # Convert to string if not already body = str(body) # Remove problematic characters and normalize body = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', body) # Remove control characters body = re.sub(r'\r\n|\r|\n', ' ', body) # Normalize line endings to spaces body = re.sub(r'\s+', ' ', body) # Collapse whitespace # Remove email markers that could break JSON body = re.sub(r'From:.*?Sent:.*?(?=\w)', '', body, flags=re.IGNORECASE | re.DOTALL) body = re.sub(r'>{2,}.*?(?=\w)', '', body, flags=re.MULTILINE) body = re.sub(r'(-{3,}|_{3,}) ?Forwarded message ?(-{3,}|_{3,})', '', body) # Escape any remaining special characters body = body.replace('\\', '\\\\') body = body.replace('"', '\\"') body = body.replace('\t', ' ') return body.strip() def get_emails_within_date_range(self, folder_names, start_date, end_date, mailboxes): email_data = [] processed_count = 0 error_count = 0 skipped_folders = [] # Define folder IDs folder_ids = { "Inbox": 6, # olFolderInbox "Sent Items": 5, # olFolderSentMail "Deleted Items": 3 # olFolderDeletedItems } # Convert dates for filtering start_datetime = datetime.fromisoformat(start_date) end_datetime = datetime.fromisoformat(end_date) # Convert dates to UTC for comparison local_tz = pytz.timezone('America/Chicago') start_utc = local_tz.localize(datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0)).astimezone(pytz.UTC) end_utc = local_tz.localize(datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59)).astimezone(pytz.UTC) for account in mailboxes: try: store = account.DeliveryStore # Process each requested folder for folder_name in folder_names: # Skip Deleted Items folder if not enabled if folder_name == "Deleted Items" and not self.process_deleted_items: continue folder_id = folder_ids.get(folder_name) if not folder_id: continue folder = store.GetDefaultFolder(folder_id) if folder and folder.Items.Count > 0: items = folder.Items for i in range(1, items.Count + 1): try: email = items.Item(i) if hasattr(email, 'ReceivedTime'): # Check if email is within date range email_time = self.to_utc(email.ReceivedTime) if not (start_utc <= email_time <= end_utc): continue # Convert dates to UTC datetime objects received_datetime = self.to_utc(email.ReceivedTime) sent_datetime = self.to_utc(email.SentOn) if hasattr(email, 'SentOn') and email.SentOn else None # Get recipients to = getattr(email, 'To', '') if not isinstance(to, str): to = '; '.join(recipient.Name for recipient in email.Recipients if hasattr(recipient, 'Name')) # Get sender email sender_email = getattr(email, 'SenderEmailAddress', '') if '/O=EXCHANGELABS/' in sender_email.upper(): # Try to get from Recipients if hasattr(email, 'Recipients'): for i in range(1, email.Recipients.Count + 1): recipient = email.Recipients.Item(i) if hasattr(recipient, 'Type') and recipient.Type == 1: # 1 = To sender_email = getattr(recipient, 'Address', sender_email) break # Get attachments attachments = [attachment.FileName for attachment in email.Attachments] if hasattr(email, 'Attachments') and email.Attachments.Count > 0 else [] # Clean body body = self.clean_email_body(getattr(email, 'Body', '')) try: email_metadata = EmailMetadata( AccountName=account.DisplayName, Entry_ID=email.EntryID, Folder=folder_name, Subject=email.Subject, SenderName=getattr(email, 'SenderName', ''), SenderEmailAddress=sender_email, ReceivedTime=received_datetime, SentOn=sent_datetime, To=to, Body=body, Attachments=attachments, IsMarkedAsTask=getattr(email, 'IsMarkedAsTask', False), UnRead=getattr(email, 'UnRead', False), Categories='; '.join(email.Categories) if hasattr(email, 'Categories') else '' ) except Exception: raise email_data.append(email_metadata) processed_count += 1 except Exception: error_count += 1 continue except Exception: continue return email_data def to_utc(self, dt): try: if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None: local_tz = pytz.timezone('America/Chicago') local_dt = local_tz.localize(dt) return local_dt.astimezone(pytz.utc) else: return dt.astimezone(pytz.utc) except Exception: raise