SQLiteHandler.py•12.9 kB
import sqlite3
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
import json
import sys
import logging
import time
import os
from EmailMetadata import EmailMetadata
import logging
# Configure logging
logger = logging.getLogger('outlook-email.sqlite')
class SQLiteHandler:
    def __init__(self, db_path: str) -> None:
        """
        Initialize SQLite database connection and create tables if they don't exist.
        
        Args:
            db_path (str): Path to SQLite database file
        """
        try:
            logger.info(f"Initializing SQLite at {db_path}")
            self.db_path = db_path
            self.conn = self._create_connection()
            self.conn.row_factory = sqlite3.Row
            self._create_tables()
            logger.info("SQLite initialized successfully")
        except Exception as e:
            logger.error(f"Error initializing SQLite: {str(e)}", exc_info=True)
            raise
    def _create_connection(self, max_retries: int = 3) -> sqlite3.Connection:
        """Create database connection with retry logic."""
        # Ensure directory exists
        db_dir = os.path.dirname(self.db_path)
        if not os.path.exists(db_dir):
            logger.info(f"Creating directory: {db_dir}")
            os.makedirs(db_dir, exist_ok=True)
            
        for attempt in range(max_retries):
            try:
                # Use isolation_level with a value instead of None to avoid autocommit mode
                # which can cause locking issues
                return sqlite3.connect(
                    self.db_path,
                    timeout=30.0,  # 30 second timeout
                    isolation_level="IMMEDIATE"  # Use explicit transactions instead of autocommit
                )
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                logger.warning(f"Retry {attempt + 1}/{max_retries} connecting to SQLite: {str(e)}")
                time.sleep(1)
    def _create_tables(self) -> None:
        """Create necessary database tables if they don't exist."""
        cursor = self.conn.cursor()
        
        # Drop and recreate emails table
        cursor.execute('DROP TABLE IF EXISTS emails')
        
        # Create main emails table
        cursor.execute('''
        CREATE TABLE emails (
            id TEXT PRIMARY KEY,
            account TEXT NOT NULL,
            folder TEXT NOT NULL,
            subject TEXT,
            sender_name TEXT,
            sender_email TEXT,
            received_time DATETIME,
            sent_time DATETIME,
            recipients TEXT,
            is_task BOOLEAN,
            unread BOOLEAN,
            categories TEXT,
            processed BOOLEAN DEFAULT FALSE,
            last_updated DATETIME,
            body TEXT,
            attachments TEXT
        )
        ''')
        
        # Create optimized indices
        cursor.execute('CREATE INDEX idx_folder ON emails(folder)')
        cursor.execute('CREATE INDEX idx_received_time ON emails(received_time)')
        cursor.execute('CREATE INDEX idx_processed ON emails(processed)')
        
        self.conn.commit()
    def add_or_update_email(self, email: EmailMetadata, cursor: Optional[sqlite3.Cursor] = None) -> bool:
        """
        Add or update an email in the database.
        
        Args:
            email (EmailMetadata): Email metadata to store
            cursor (Optional[sqlite3.Cursor]): Optional cursor for transaction management
            
        Returns:
            bool: True if successful
        """
        try:
            # Use provided cursor or create new one
            cursor = cursor or self.conn.cursor()
            
            # Convert email to dict
            try:
                email_dict = email.to_dict()
                logger.debug(f"Processing email: {email_dict.get('Subject', 'No Subject')}")
            except Exception as e:
                logger.error(f"Error converting email to dict: {str(e)}")
                return False
            
            try:
                # Prepare data for insertion/update
                # Convert datetime objects to ISO format strings
                received_time = email_dict.get('ReceivedTime')
                sent_time = email_dict.get('SentOn')
                
                if isinstance(received_time, datetime):
                    received_time = received_time.isoformat()
                if isinstance(sent_time, datetime):
                    sent_time = sent_time.isoformat()
                
                data = {
                    'id': email_dict.get('Entry_ID'),
                    'account': email_dict.get('AccountName'),
                    'folder': email_dict.get('Folder'),
                    'subject': email_dict.get('Subject'),
                    'sender_name': email_dict.get('SenderName'),
                    'sender_email': email_dict.get('SenderEmailAddress'),
                    'received_time': received_time,
                    'sent_time': sent_time,
                    'recipients': email_dict.get('To'),
                    'is_task': bool(email_dict.get('IsMarkedAsTask')),
                    'unread': bool(email_dict.get('UnRead')),
                    'categories': email_dict.get('Categories'),
                    'processed': bool(email_dict.get('embedding')),
                    'last_updated': datetime.now().isoformat(),
                    'body': email_dict.get('Body'),
                    'attachments': email_dict.get('Attachments', '')
                }
                
                # Validate required fields
                required_fields = ['id', 'account', 'folder', 'subject', 'received_time', 'body']
                missing_fields = [field for field in required_fields if not data[field]]
                if missing_fields:
                    logger.warning(f"Missing required fields: {', '.join(missing_fields)}")
                    return False
                
            except Exception as e:
                logger.error(f"Error preparing data for SQLite: {str(e)}")
                return False
            
            # Use UPSERT syntax with retry logic
            max_retries = 3
            for attempt in range(max_retries):
                try:
                    # Check if email exists in a transaction
                    cursor.execute('BEGIN IMMEDIATE')
                    cursor.execute('SELECT id FROM emails WHERE id = ?', (data['id'],))
                    exists = cursor.fetchone() is not None
                    
                    if exists:
                        logger.info(f"Email {data['id']} already exists, skipping")
                        cursor.execute('COMMIT')
                        return True
                    
                    # Insert new email
                    cursor.execute('''
                    INSERT INTO emails (
                        id, account, folder, subject, sender_name, sender_email,
                        received_time, sent_time, recipients, is_task, unread,
                        categories, processed, last_updated, body, attachments
                    ) VALUES (
                        :id, :account, :folder, :subject, :sender_name, :sender_email,
                        :received_time, :sent_time, :recipients, :is_task, :unread,
                        :categories, :processed, :last_updated, :body, :attachments
                    )
                    ''', data)
                    
                    cursor.execute('COMMIT')
                    logger.info(f"Successfully added email {data['id']}")
                    return True
                    
                except sqlite3.OperationalError as e:
                    cursor.execute('ROLLBACK')
                    if "database is locked" in str(e) and attempt < max_retries - 1:
                        logger.warning(f"Database locked, retry {attempt + 1}/{max_retries}")
                        time.sleep(1)
                        continue
                    logger.error(f"SQLite operational error: {str(e)}")
                    raise
                except Exception as e:
                    cursor.execute('ROLLBACK')
                    logger.error(f"Unexpected error: {str(e)}")
                    raise
                    
                except sqlite3.OperationalError as e:
                    if "database is locked" in str(e) and attempt < max_retries - 1:
                        logger.warning(f"Database locked, retry {attempt + 1}/{max_retries}")
                        time.sleep(1)
                        continue
                    raise
                    
        except Exception as e:
            logger.error(f"Error adding/updating email: {str(e)}", exc_info=True)
            self.conn.rollback()
            return False
    def get_unprocessed_emails(self, limit: int = 100) -> List[Dict[str, Any]]:
        """
        Get emails that haven't been processed (no embeddings generated).
        
        Args:
            limit (int): Maximum number of emails to return
            
        Returns:
            List[Dict]: List of unprocessed emails
        """
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
            SELECT 
                id,
                account as AccountName,
                folder as Folder,
                subject as Subject,
                sender_name as SenderName,
                sender_email as SenderEmailAddress,
                received_time as ReceivedTime,
                sent_time as SentOn,
                recipients as "To",
                body as Body,
                COALESCE(attachments, '') as Attachments,
                is_task as IsMarkedAsTask,
                unread as UnRead,
                categories as Categories
            FROM emails 
            WHERE processed = FALSE 
            ORDER BY received_time DESC 
            LIMIT ?
            ''', (limit,))
            
            return [dict(row) for row in cursor.fetchall()]
            
        except Exception as e:
            logger.error(f"Error getting unprocessed emails: {str(e)}", exc_info=True)
            return []
    def mark_as_processed(self, email_id: str) -> bool:
        """
        Mark an email as processed after generating its embedding.
        
        Args:
            email_id (str): ID of the email to mark
            
        Returns:
            bool: True if successful
        """
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
            UPDATE emails 
            SET processed = TRUE, 
                last_updated = ? 
            WHERE id = ?
            ''', (datetime.now().isoformat(), email_id))
            
            self.conn.commit()
            return True
            
        except Exception as e:
            logger.error(f"Error marking email as processed: {str(e)}", exc_info=True)
            self.conn.rollback()
            return False
    def get_email_by_id(self, email_id: str) -> Optional[Dict[str, Any]]:
        """
        Get a specific email by ID.
        
        Args:
            email_id (str): ID of the email to retrieve
            
        Returns:
            Optional[Dict]: Email data if found
        """
        try:
            cursor = self.conn.cursor()
            cursor.execute('SELECT * FROM emails WHERE id = ?', (email_id,))
            row = cursor.fetchone()
            return dict(row) if row else None
            
        except Exception as e:
            logger.error(f"Error getting email by ID: {str(e)}", exc_info=True)
            return None
    def get_email_count(self) -> int:
        """
        Get total number of emails in database.
        
        Returns:
            int: Number of emails
        """
        try:
            cursor = self.conn.cursor()
            cursor.execute('SELECT COUNT(*) FROM emails')
            return cursor.fetchone()[0]
            
        except Exception as e:
            logger.error(f"Error getting email count: {str(e)}", exc_info=True)
            return 0
    def close(self) -> None:
        """Close the database connection."""
        try:
            if hasattr(self, 'conn') and self.conn:
                self.conn.close()
                logger.info("SQLite connection closed")
        except Exception as e:
            logger.error(f"Error closing database: {str(e)}", exc_info=True)
    
    def __del__(self) -> None:
        """Destructor to ensure connection is closed when object is garbage collected."""
        self.close()
    
    def __enter__(self) -> 'SQLiteHandler':
        """Enter context manager."""
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Exit context manager and close connection."""
        self.close()