We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/brysontang/DeltaTask'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import uuid
import logging
from typing import List, Dict, Any, Optional, Set, Tuple
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from deltatask.models import Base, Todo, Tag
# Get logger
logger = logging.getLogger("DeltaTask")
class DeltaTaskRepository:
"""Repository for database operations on tasks and tags."""
def __init__(self, db_url: str = "sqlite:///deltatask.db"):
"""Initialize the repository with a database connection."""
self.engine = create_engine(db_url)
self.Session = sessionmaker(bind=self.engine)
Base.metadata.create_all(self.engine)
@contextmanager
def session_scope(self):
"""Provide a transactional scope around a series of operations."""
session = self.Session()
try:
yield session
session.commit()
logger.debug("Database transaction committed successfully")
except Exception as e:
logger.error(f"Database transaction error: {e}", exc_info=True)
session.rollback()
logger.info("Database transaction rolled back")
raise
finally:
session.close()
def add_todo(self, todo_data: Dict[str, Any]) -> str:
"""Add a new todo to the database."""
with self.session_scope() as session:
# Generate a UUID if not provided
todo_id = todo_data.get('id', str(uuid.uuid4()))
# Create the Todo object
todo = Todo(
id=todo_id,
title=todo_data['title'],
description=todo_data.get('description', ''),
deadline=todo_data.get('deadline'),
urgency=todo_data.get('urgency', 1),
effort=todo_data.get('effort', 1),
parent_id=todo_data.get('parent_id')
)
# Handle tags
if 'tags' in todo_data and todo_data['tags']:
for tag_name in todo_data['tags']:
# Check if tag exists
tag = session.query(Tag).filter(Tag.name == tag_name).first()
if not tag:
# Create new tag
tag = Tag(id=str(uuid.uuid4()), name=tag_name)
session.add(tag)
todo.tags.append(tag)
session.add(todo)
return todo_id
def get_todos(self, include_completed: bool = False,
parent_id: Optional[str] = None,
tags: List[str] = None) -> List[Dict[str, Any]]:
"""Get todos with optional filtering."""
with self.session_scope() as session:
query = session.query(Todo)
# Apply filters
if not include_completed:
query = query.filter(Todo.completed == False)
if parent_id is not None:
query = query.filter(Todo.parent_id == parent_id)
if tags:
query = query.join(Todo.tags).filter(Tag.name.in_(tags)).distinct()
todos = query.all()
# Convert to dicts
result = []
for todo in todos:
todo_dict = todo.to_dict()
# We don't need to recursively get subtasks here as we'll do it in the service layer
result.append(todo_dict)
# Sort by deadline, urgency, and effort
result.sort(key=lambda x: (
x.get('deadline') is None, # None deadlines come last
x.get('deadline', '9999-12-31'), # Then sort by deadline
-x.get('urgency', 1), # Then by urgency (descending)
x.get('effort', 999) # Then by effort (ascending)
))
return result
def get_todo_by_id(self, todo_id: str) -> Optional[Dict[str, Any]]:
"""Get a specific todo by ID."""
with self.session_scope() as session:
todo = session.query(Todo).filter(Todo.id == todo_id).first()
if not todo:
return None
return todo.to_dict()
def update_todo(self, todo_id: str, updates: Dict[str, Any]) -> bool:
"""Update a todo with new values."""
with self.session_scope() as session:
todo = session.query(Todo).filter(Todo.id == todo_id).first()
if not todo:
return False
# Update simple fields
if 'title' in updates:
todo.title = updates['title']
if 'description' in updates:
todo.description = updates['description']
if 'deadline' in updates:
todo.deadline = updates['deadline']
if 'urgency' in updates:
todo.urgency = updates['urgency']
if 'effort' in updates:
todo.effort = updates['effort']
if 'completed' in updates:
todo.completed = updates['completed']
if 'parent_id' in updates:
todo.parent_id = updates['parent_id']
# Handle tags update
if 'tags' in updates:
# Clear existing tags
todo.tags = []
# Add new tags
for tag_name in updates['tags']:
tag = session.query(Tag).filter(Tag.name == tag_name).first()
if not tag:
tag = Tag(id=str(uuid.uuid4()), name=tag_name)
session.add(tag)
todo.tags.append(tag)
return True
def delete_todo(self, todo_id: str, delete_subtasks: bool = True) -> bool:
"""Delete a todo and optionally its subtasks."""
with self.session_scope() as session:
todo = session.query(Todo).filter(Todo.id == todo_id).first()
if not todo:
return False
if delete_subtasks:
# Recursively delete all subtasks
subtasks = session.query(Todo).filter(Todo.parent_id == todo_id).all()
for subtask in subtasks:
self.delete_todo(subtask.id, True)
else:
# Update subtasks to remove parent reference
session.query(Todo).filter(Todo.parent_id == todo_id).update({"parent_id": None})
# Delete the todo
session.delete(todo)
return True
def search_todos(self, query: str) -> List[Dict[str, Any]]:
"""Search todos by title, description, or tags."""
with self.session_scope() as session:
# Search todos with title or description containing the query
todos = session.query(Todo).filter(
(Todo.title.contains(query)) |
(Todo.description.contains(query))
).all()
# Also search in tags
tag_todos = session.query(Todo).join(Todo.tags).filter(Tag.name.contains(query)).all()
# Combine results and remove duplicates
all_todos = set([todo.id for todo in todos] + [todo.id for todo in tag_todos])
# Fetch full todos with their relationships
results = []
for todo_id in all_todos:
todo = session.query(Todo).filter(Todo.id == todo_id).first()
if todo:
results.append(todo.to_dict())
# Sort by deadline, urgency, and effort
results.sort(key=lambda x: (
x.get('deadline') is None,
x.get('deadline', '9999-12-31'),
-x.get('urgency', 1),
x.get('effort', 999)
))
return results
def get_all_tags(self) -> List[str]:
"""Get all unique tag names."""
with self.session_scope() as session:
tags = session.query(Tag.name).all()
return [tag[0] for tag in tags]
def get_statistics(self) -> Dict[str, Any]:
"""Get task statistics."""
with self.session_scope() as session:
total = session.query(Todo).count()
completed = session.query(Todo).filter(Todo.completed == True).count()
# Count by urgency
by_urgency = {}
for urgency in range(1, 6):
count = session.query(Todo).filter(Todo.completed == False, Todo.urgency == urgency).count()
by_urgency[urgency] = count
# Count upcoming deadlines
from datetime import datetime
today = datetime.now().date().isoformat()
week_later = today.replace(today[:8], str(int(today[8:]) + 7))
upcoming_deadlines = session.query(Todo).filter(
Todo.completed == False,
Todo.deadline.between(today, week_later)
).count()
return {
"total": total,
"completed": completed,
"completion_rate": (completed / total * 100) if total > 0 else 0,
"by_urgency": by_urgency,
"upcoming_deadlines": upcoming_deadlines
}