task_repository.py•9.48 kB
import uuid
import logging
from typing import List, Dict, Any, Optional, Set, Tuple
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from deltatask.models import Base, Todo, Tag
# Get logger
logger = logging.getLogger("DeltaTask")
class DeltaTaskRepository:
    """Repository for database operations on tasks and tags."""
    
    def __init__(self, db_url: str = "sqlite:///deltatask.db"):
        """Initialize the repository with a database connection."""
        self.engine = create_engine(db_url)
        self.Session = sessionmaker(bind=self.engine)
        Base.metadata.create_all(self.engine)
    
    @contextmanager
    def session_scope(self):
        """Provide a transactional scope around a series of operations."""
        session = self.Session()
        try:
            yield session
            session.commit()
            logger.debug("Database transaction committed successfully")
        except Exception as e:
            logger.error(f"Database transaction error: {e}", exc_info=True)
            session.rollback()
            logger.info("Database transaction rolled back")
            raise
        finally:
            session.close()
    
    def add_todo(self, todo_data: Dict[str, Any]) -> str:
        """Add a new todo to the database."""
        with self.session_scope() as session:
            # Generate a UUID if not provided
            todo_id = todo_data.get('id', str(uuid.uuid4()))
            
            # Create the Todo object
            todo = Todo(
                id=todo_id,
                title=todo_data['title'],
                description=todo_data.get('description', ''),
                deadline=todo_data.get('deadline'),
                urgency=todo_data.get('urgency', 1),
                effort=todo_data.get('effort', 1),
                parent_id=todo_data.get('parent_id')
            )
            
            # Handle tags
            if 'tags' in todo_data and todo_data['tags']:
                for tag_name in todo_data['tags']:
                    # Check if tag exists
                    tag = session.query(Tag).filter(Tag.name == tag_name).first()
                    if not tag:
                        # Create new tag
                        tag = Tag(id=str(uuid.uuid4()), name=tag_name)
                        session.add(tag)
                    todo.tags.append(tag)
            
            session.add(todo)
            return todo_id
    
    def get_todos(self, include_completed: bool = False, 
                 parent_id: Optional[str] = None,
                 tags: List[str] = None) -> List[Dict[str, Any]]:
        """Get todos with optional filtering."""
        with self.session_scope() as session:
            query = session.query(Todo)
            
            # Apply filters
            if not include_completed:
                query = query.filter(Todo.completed == False)
                
            if parent_id is not None:
                query = query.filter(Todo.parent_id == parent_id)
                
            if tags:
                query = query.join(Todo.tags).filter(Tag.name.in_(tags)).distinct()
            
            todos = query.all()
            
            # Convert to dicts
            result = []
            for todo in todos:
                todo_dict = todo.to_dict()
                # We don't need to recursively get subtasks here as we'll do it in the service layer
                result.append(todo_dict)
            
            # Sort by deadline, urgency, and effort
            result.sort(key=lambda x: (
                x.get('deadline') is None,  # None deadlines come last
                x.get('deadline', '9999-12-31'),  # Then sort by deadline
                -x.get('urgency', 1),  # Then by urgency (descending)
                x.get('effort', 999)  # Then by effort (ascending)
            ))
            
            return result
    
    def get_todo_by_id(self, todo_id: str) -> Optional[Dict[str, Any]]:
        """Get a specific todo by ID."""
        with self.session_scope() as session:
            todo = session.query(Todo).filter(Todo.id == todo_id).first()
            if not todo:
                return None
            return todo.to_dict()
    
    def update_todo(self, todo_id: str, updates: Dict[str, Any]) -> bool:
        """Update a todo with new values."""
        with self.session_scope() as session:
            todo = session.query(Todo).filter(Todo.id == todo_id).first()
            if not todo:
                return False
            
            # Update simple fields
            if 'title' in updates:
                todo.title = updates['title']
            if 'description' in updates:
                todo.description = updates['description']
            if 'deadline' in updates:
                todo.deadline = updates['deadline']
            if 'urgency' in updates:
                todo.urgency = updates['urgency']
            if 'effort' in updates:
                todo.effort = updates['effort']
            if 'completed' in updates:
                todo.completed = updates['completed']
            if 'parent_id' in updates:
                todo.parent_id = updates['parent_id']
            
            # Handle tags update
            if 'tags' in updates:
                # Clear existing tags
                todo.tags = []
                
                # Add new tags
                for tag_name in updates['tags']:
                    tag = session.query(Tag).filter(Tag.name == tag_name).first()
                    if not tag:
                        tag = Tag(id=str(uuid.uuid4()), name=tag_name)
                        session.add(tag)
                    todo.tags.append(tag)
            
            return True
    
    def delete_todo(self, todo_id: str, delete_subtasks: bool = True) -> bool:
        """Delete a todo and optionally its subtasks."""
        with self.session_scope() as session:
            todo = session.query(Todo).filter(Todo.id == todo_id).first()
            if not todo:
                return False
            
            if delete_subtasks:
                # Recursively delete all subtasks
                subtasks = session.query(Todo).filter(Todo.parent_id == todo_id).all()
                for subtask in subtasks:
                    self.delete_todo(subtask.id, True)
            else:
                # Update subtasks to remove parent reference
                session.query(Todo).filter(Todo.parent_id == todo_id).update({"parent_id": None})
            
            # Delete the todo
            session.delete(todo)
            return True
    
    def search_todos(self, query: str) -> List[Dict[str, Any]]:
        """Search todos by title, description, or tags."""
        with self.session_scope() as session:
            # Search todos with title or description containing the query
            todos = session.query(Todo).filter(
                (Todo.title.contains(query)) |
                (Todo.description.contains(query))
            ).all()
            
            # Also search in tags
            tag_todos = session.query(Todo).join(Todo.tags).filter(Tag.name.contains(query)).all()
            
            # Combine results and remove duplicates
            all_todos = set([todo.id for todo in todos] + [todo.id for todo in tag_todos])
            
            # Fetch full todos with their relationships
            results = []
            for todo_id in all_todos:
                todo = session.query(Todo).filter(Todo.id == todo_id).first()
                if todo:
                    results.append(todo.to_dict())
            
            # Sort by deadline, urgency, and effort
            results.sort(key=lambda x: (
                x.get('deadline') is None,
                x.get('deadline', '9999-12-31'),
                -x.get('urgency', 1),
                x.get('effort', 999)
            ))
            
            return results
    
    def get_all_tags(self) -> List[str]:
        """Get all unique tag names."""
        with self.session_scope() as session:
            tags = session.query(Tag.name).all()
            return [tag[0] for tag in tags]
    
    def get_statistics(self) -> Dict[str, Any]:
        """Get task statistics."""
        with self.session_scope() as session:
            total = session.query(Todo).count()
            completed = session.query(Todo).filter(Todo.completed == True).count()
            
            # Count by urgency
            by_urgency = {}
            for urgency in range(1, 6):
                count = session.query(Todo).filter(Todo.completed == False, Todo.urgency == urgency).count()
                by_urgency[urgency] = count
            
            # Count upcoming deadlines
            from datetime import datetime
            today = datetime.now().date().isoformat()
            week_later = today.replace(today[:8], str(int(today[8:]) + 7))
            upcoming_deadlines = session.query(Todo).filter(
                Todo.completed == False,
                Todo.deadline.between(today, week_later)
            ).count()
            
            return {
                "total": total,
                "completed": completed,
                "completion_rate": (completed / total * 100) if total > 0 else 0,
                "by_urgency": by_urgency,
                "upcoming_deadlines": upcoming_deadlines
            }