DeltaTask MCP Server

import uuid import logging from typing import List, Dict, Any, Optional, Set, Tuple from contextlib import contextmanager from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session from deltatask.models import Base, Todo, Tag # Get logger logger = logging.getLogger("DeltaTask") class DeltaTaskRepository: """Repository for database operations on tasks and tags.""" def __init__(self, db_url: str = "sqlite:///deltatask.db"): """Initialize the repository with a database connection.""" self.engine = create_engine(db_url) self.Session = sessionmaker(bind=self.engine) Base.metadata.create_all(self.engine) @contextmanager def session_scope(self): """Provide a transactional scope around a series of operations.""" session = self.Session() try: yield session session.commit() logger.debug("Database transaction committed successfully") except Exception as e: logger.error(f"Database transaction error: {e}", exc_info=True) session.rollback() logger.info("Database transaction rolled back") raise finally: session.close() def add_todo(self, todo_data: Dict[str, Any]) -> str: """Add a new todo to the database.""" with self.session_scope() as session: # Generate a UUID if not provided todo_id = todo_data.get('id', str(uuid.uuid4())) # Create the Todo object todo = Todo( id=todo_id, title=todo_data['title'], description=todo_data.get('description', ''), deadline=todo_data.get('deadline'), urgency=todo_data.get('urgency', 1), effort=todo_data.get('effort', 1), parent_id=todo_data.get('parent_id') ) # Handle tags if 'tags' in todo_data and todo_data['tags']: for tag_name in todo_data['tags']: # Check if tag exists tag = session.query(Tag).filter(Tag.name == tag_name).first() if not tag: # Create new tag tag = Tag(id=str(uuid.uuid4()), name=tag_name) session.add(tag) todo.tags.append(tag) session.add(todo) return todo_id def get_todos(self, include_completed: bool = False, parent_id: Optional[str] = None, tags: List[str] = None) -> List[Dict[str, Any]]: """Get todos with optional filtering.""" with self.session_scope() as session: query = session.query(Todo) # Apply filters if not include_completed: query = query.filter(Todo.completed == False) if parent_id is not None: query = query.filter(Todo.parent_id == parent_id) if tags: query = query.join(Todo.tags).filter(Tag.name.in_(tags)).distinct() todos = query.all() # Convert to dicts result = [] for todo in todos: todo_dict = todo.to_dict() # We don't need to recursively get subtasks here as we'll do it in the service layer result.append(todo_dict) # Sort by deadline, urgency, and effort result.sort(key=lambda x: ( x.get('deadline') is None, # None deadlines come last x.get('deadline', '9999-12-31'), # Then sort by deadline -x.get('urgency', 1), # Then by urgency (descending) x.get('effort', 999) # Then by effort (ascending) )) return result def get_todo_by_id(self, todo_id: str) -> Optional[Dict[str, Any]]: """Get a specific todo by ID.""" with self.session_scope() as session: todo = session.query(Todo).filter(Todo.id == todo_id).first() if not todo: return None return todo.to_dict() def update_todo(self, todo_id: str, updates: Dict[str, Any]) -> bool: """Update a todo with new values.""" with self.session_scope() as session: todo = session.query(Todo).filter(Todo.id == todo_id).first() if not todo: return False # Update simple fields if 'title' in updates: todo.title = updates['title'] if 'description' in updates: todo.description = updates['description'] if 'deadline' in updates: todo.deadline = updates['deadline'] if 'urgency' in updates: todo.urgency = updates['urgency'] if 'effort' in updates: todo.effort = updates['effort'] if 'completed' in updates: todo.completed = updates['completed'] if 'parent_id' in updates: todo.parent_id = updates['parent_id'] # Handle tags update if 'tags' in updates: # Clear existing tags todo.tags = [] # Add new tags for tag_name in updates['tags']: tag = session.query(Tag).filter(Tag.name == tag_name).first() if not tag: tag = Tag(id=str(uuid.uuid4()), name=tag_name) session.add(tag) todo.tags.append(tag) return True def delete_todo(self, todo_id: str, delete_subtasks: bool = True) -> bool: """Delete a todo and optionally its subtasks.""" with self.session_scope() as session: todo = session.query(Todo).filter(Todo.id == todo_id).first() if not todo: return False if delete_subtasks: # Recursively delete all subtasks subtasks = session.query(Todo).filter(Todo.parent_id == todo_id).all() for subtask in subtasks: self.delete_todo(subtask.id, True) else: # Update subtasks to remove parent reference session.query(Todo).filter(Todo.parent_id == todo_id).update({"parent_id": None}) # Delete the todo session.delete(todo) return True def search_todos(self, query: str) -> List[Dict[str, Any]]: """Search todos by title, description, or tags.""" with self.session_scope() as session: # Search todos with title or description containing the query todos = session.query(Todo).filter( (Todo.title.contains(query)) | (Todo.description.contains(query)) ).all() # Also search in tags tag_todos = session.query(Todo).join(Todo.tags).filter(Tag.name.contains(query)).all() # Combine results and remove duplicates all_todos = set([todo.id for todo in todos] + [todo.id for todo in tag_todos]) # Fetch full todos with their relationships results = [] for todo_id in all_todos: todo = session.query(Todo).filter(Todo.id == todo_id).first() if todo: results.append(todo.to_dict()) # Sort by deadline, urgency, and effort results.sort(key=lambda x: ( x.get('deadline') is None, x.get('deadline', '9999-12-31'), -x.get('urgency', 1), x.get('effort', 999) )) return results def get_all_tags(self) -> List[str]: """Get all unique tag names.""" with self.session_scope() as session: tags = session.query(Tag.name).all() return [tag[0] for tag in tags] def get_statistics(self) -> Dict[str, Any]: """Get task statistics.""" with self.session_scope() as session: total = session.query(Todo).count() completed = session.query(Todo).filter(Todo.completed == True).count() # Count by urgency by_urgency = {} for urgency in range(1, 6): count = session.query(Todo).filter(Todo.completed == False, Todo.urgency == urgency).count() by_urgency[urgency] = count # Count upcoming deadlines from datetime import datetime today = datetime.now().date().isoformat() week_later = today.replace(today[:8], str(int(today[8:]) + 7)) upcoming_deadlines = session.query(Todo).filter( Todo.completed == False, Todo.deadline.between(today, week_later) ).count() return { "total": total, "completed": completed, "completion_rate": (completed / total * 100) if total > 0 else 0, "by_urgency": by_urgency, "upcoming_deadlines": upcoming_deadlines }