Email Processing MCP Server
by Cam10001110101
Verified
- src
import sqlite3
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
import json
import sys
import logging
import time
import os
from EmailMetadata import EmailMetadata
import logging
# Configure logging
logger = logging.getLogger('outlook-email.sqlite')
class SQLiteHandler:
def __init__(self, db_path: str) -> None:
"""
Initialize SQLite database connection and create tables if they don't exist.
Args:
db_path (str): Path to SQLite database file
"""
try:
logger.info(f"Initializing SQLite at {db_path}")
self.db_path = db_path
self.conn = self._create_connection()
self.conn.row_factory = sqlite3.Row
self._create_tables()
logger.info("SQLite initialized successfully")
except Exception as e:
logger.error(f"Error initializing SQLite: {str(e)}", exc_info=True)
raise
def _create_connection(self, max_retries: int = 3) -> sqlite3.Connection:
"""Create database connection with retry logic."""
# Ensure directory exists
db_dir = os.path.dirname(self.db_path)
if not os.path.exists(db_dir):
logger.info(f"Creating directory: {db_dir}")
os.makedirs(db_dir, exist_ok=True)
for attempt in range(max_retries):
try:
# Use isolation_level with a value instead of None to avoid autocommit mode
# which can cause locking issues
return sqlite3.connect(
self.db_path,
timeout=30.0, # 30 second timeout
isolation_level="IMMEDIATE" # Use explicit transactions instead of autocommit
)
except Exception as e:
if attempt == max_retries - 1:
raise
logger.warning(f"Retry {attempt + 1}/{max_retries} connecting to SQLite: {str(e)}")
time.sleep(1)
def _create_tables(self) -> None:
"""Create necessary database tables if they don't exist."""
cursor = self.conn.cursor()
# Drop and recreate emails table
cursor.execute('DROP TABLE IF EXISTS emails')
# Create main emails table
cursor.execute('''
CREATE TABLE emails (
id TEXT PRIMARY KEY,
account TEXT NOT NULL,
folder TEXT NOT NULL,
subject TEXT,
sender_name TEXT,
sender_email TEXT,
received_time DATETIME,
sent_time DATETIME,
recipients TEXT,
is_task BOOLEAN,
unread BOOLEAN,
categories TEXT,
processed BOOLEAN DEFAULT FALSE,
last_updated DATETIME,
body TEXT,
attachments TEXT
)
''')
# Create optimized indices
cursor.execute('CREATE INDEX idx_folder ON emails(folder)')
cursor.execute('CREATE INDEX idx_received_time ON emails(received_time)')
cursor.execute('CREATE INDEX idx_processed ON emails(processed)')
self.conn.commit()
def add_or_update_email(self, email: EmailMetadata, cursor: Optional[sqlite3.Cursor] = None) -> bool:
"""
Add or update an email in the database.
Args:
email (EmailMetadata): Email metadata to store
cursor (Optional[sqlite3.Cursor]): Optional cursor for transaction management
Returns:
bool: True if successful
"""
try:
# Use provided cursor or create new one
cursor = cursor or self.conn.cursor()
# Convert email to dict
try:
email_dict = email.to_dict()
logger.debug(f"Processing email: {email_dict.get('Subject', 'No Subject')}")
except Exception as e:
logger.error(f"Error converting email to dict: {str(e)}")
return False
try:
# Prepare data for insertion/update
# Convert datetime objects to ISO format strings
received_time = email_dict.get('ReceivedTime')
sent_time = email_dict.get('SentOn')
if isinstance(received_time, datetime):
received_time = received_time.isoformat()
if isinstance(sent_time, datetime):
sent_time = sent_time.isoformat()
data = {
'id': email_dict.get('Entry_ID'),
'account': email_dict.get('AccountName'),
'folder': email_dict.get('Folder'),
'subject': email_dict.get('Subject'),
'sender_name': email_dict.get('SenderName'),
'sender_email': email_dict.get('SenderEmailAddress'),
'received_time': received_time,
'sent_time': sent_time,
'recipients': email_dict.get('To'),
'is_task': bool(email_dict.get('IsMarkedAsTask')),
'unread': bool(email_dict.get('UnRead')),
'categories': email_dict.get('Categories'),
'processed': bool(email_dict.get('embedding')),
'last_updated': datetime.now().isoformat(),
'body': email_dict.get('Body'),
'attachments': email_dict.get('Attachments', '')
}
# Validate required fields
required_fields = ['id', 'account', 'folder', 'subject', 'received_time', 'body']
missing_fields = [field for field in required_fields if not data[field]]
if missing_fields:
logger.warning(f"Missing required fields: {', '.join(missing_fields)}")
return False
except Exception as e:
logger.error(f"Error preparing data for SQLite: {str(e)}")
return False
# Use UPSERT syntax with retry logic
max_retries = 3
for attempt in range(max_retries):
try:
# Check if email exists in a transaction
cursor.execute('BEGIN IMMEDIATE')
cursor.execute('SELECT id FROM emails WHERE id = ?', (data['id'],))
exists = cursor.fetchone() is not None
if exists:
logger.info(f"Email {data['id']} already exists, skipping")
cursor.execute('COMMIT')
return True
# Insert new email
cursor.execute('''
INSERT INTO emails (
id, account, folder, subject, sender_name, sender_email,
received_time, sent_time, recipients, is_task, unread,
categories, processed, last_updated, body, attachments
) VALUES (
:id, :account, :folder, :subject, :sender_name, :sender_email,
:received_time, :sent_time, :recipients, :is_task, :unread,
:categories, :processed, :last_updated, :body, :attachments
)
''', data)
cursor.execute('COMMIT')
logger.info(f"Successfully added email {data['id']}")
return True
except sqlite3.OperationalError as e:
cursor.execute('ROLLBACK')
if "database is locked" in str(e) and attempt < max_retries - 1:
logger.warning(f"Database locked, retry {attempt + 1}/{max_retries}")
time.sleep(1)
continue
logger.error(f"SQLite operational error: {str(e)}")
raise
except Exception as e:
cursor.execute('ROLLBACK')
logger.error(f"Unexpected error: {str(e)}")
raise
except sqlite3.OperationalError as e:
if "database is locked" in str(e) and attempt < max_retries - 1:
logger.warning(f"Database locked, retry {attempt + 1}/{max_retries}")
time.sleep(1)
continue
raise
except Exception as e:
logger.error(f"Error adding/updating email: {str(e)}", exc_info=True)
self.conn.rollback()
return False
def get_unprocessed_emails(self, limit: int = 100) -> List[Dict[str, Any]]:
"""
Get emails that haven't been processed (no embeddings generated).
Args:
limit (int): Maximum number of emails to return
Returns:
List[Dict]: List of unprocessed emails
"""
try:
cursor = self.conn.cursor()
cursor.execute('''
SELECT
id,
account as AccountName,
folder as Folder,
subject as Subject,
sender_name as SenderName,
sender_email as SenderEmailAddress,
received_time as ReceivedTime,
sent_time as SentOn,
recipients as "To",
body as Body,
COALESCE(attachments, '') as Attachments,
is_task as IsMarkedAsTask,
unread as UnRead,
categories as Categories
FROM emails
WHERE processed = FALSE
ORDER BY received_time DESC
LIMIT ?
''', (limit,))
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"Error getting unprocessed emails: {str(e)}", exc_info=True)
return []
def mark_as_processed(self, email_id: str) -> bool:
"""
Mark an email as processed after generating its embedding.
Args:
email_id (str): ID of the email to mark
Returns:
bool: True if successful
"""
try:
cursor = self.conn.cursor()
cursor.execute('''
UPDATE emails
SET processed = TRUE,
last_updated = ?
WHERE id = ?
''', (datetime.now().isoformat(), email_id))
self.conn.commit()
return True
except Exception as e:
logger.error(f"Error marking email as processed: {str(e)}", exc_info=True)
self.conn.rollback()
return False
def get_email_by_id(self, email_id: str) -> Optional[Dict[str, Any]]:
"""
Get a specific email by ID.
Args:
email_id (str): ID of the email to retrieve
Returns:
Optional[Dict]: Email data if found
"""
try:
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM emails WHERE id = ?', (email_id,))
row = cursor.fetchone()
return dict(row) if row else None
except Exception as e:
logger.error(f"Error getting email by ID: {str(e)}", exc_info=True)
return None
def get_email_count(self) -> int:
"""
Get total number of emails in database.
Returns:
int: Number of emails
"""
try:
cursor = self.conn.cursor()
cursor.execute('SELECT COUNT(*) FROM emails')
return cursor.fetchone()[0]
except Exception as e:
logger.error(f"Error getting email count: {str(e)}", exc_info=True)
return 0
def close(self) -> None:
"""Close the database connection."""
try:
if hasattr(self, 'conn') and self.conn:
self.conn.close()
logger.info("SQLite connection closed")
except Exception as e:
logger.error(f"Error closing database: {str(e)}", exc_info=True)
def __del__(self) -> None:
"""Destructor to ensure connection is closed when object is garbage collected."""
self.close()
def __enter__(self) -> 'SQLiteHandler':
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit context manager and close connection."""
self.close()