Skip to main content
Glama

DeltaTask MCP Server

by brysontang
task_service.py15.7 kB
import os import shutil import uuid import logging from typing import List, Dict, Any, Optional, Set, Tuple, Union from datetime import datetime from deltatask.repositories import DeltaTaskRepository from deltatask.services.obsidian_service import ObsidianMarkdownManager # Get logger logger = logging.getLogger("DeltaTask") class TaskService: """Service layer that abstracts and coordinates between database and markdown files.""" def __init__(self, db_url: str = "sqlite:///deltatask.db", vault_path: str = "TaskVault"): """Initialize with database and markdown managers.""" self.repository = DeltaTaskRepository(db_url) self.markdown_manager = ObsidianMarkdownManager(vault_path) def _ensure_id(self, task_data: Dict[str, Any]) -> str: """Ensure the task has an ID, generating one if needed.""" if 'id' not in task_data: task_data['id'] = str(uuid.uuid4()) return task_data['id'] def _recursively_add_subtasks(self, task: Dict[str, Any]) -> Dict[str, Any]: """Recursively add subtasks to a task dictionary.""" subtasks = self.repository.get_todos(include_completed=True, parent_id=task['id']) for subtask in subtasks: self._recursively_add_subtasks(subtask) task['subtasks'] = subtasks return task def _update_all_views(self) -> None: """Update all markdown views based on current database state.""" all_tasks = self.get_all_tasks(include_completed=True) self.markdown_manager.update_task_views(all_tasks) # Update statistics stats = self.repository.get_statistics() self.markdown_manager.create_statistics_file(stats) # Update tag index all_tags = set(self.repository.get_all_tags()) self.markdown_manager._create_or_update_index(all_tags) def add_task(self, task_data: Dict[str, Any]) -> Dict[str, Any]: """Add a new task and return its details.""" logger.info(f"Adding new task: {task_data.get('title', 'Untitled')}") try: # Ensure task has an ID task_id = self._ensure_id(task_data) # Validate fibonacci sequence for effort valid_efforts = [1, 2, 3, 5, 8, 13, 21] if 'effort' in task_data and task_data['effort'] not in valid_efforts: error_msg = f"Effort must be a Fibonacci number from {valid_efforts}" logger.error(error_msg) raise ValueError(error_msg) # Validate urgency if 'urgency' in task_data and not 1 <= task_data['urgency'] <= 5: error_msg = "Urgency must be between 1 and 5" logger.error(error_msg) raise ValueError(error_msg) try: # Add to database self.repository.add_todo(task_data) logger.info(f"Task {task_id} added to database") except Exception as e: logger.error(f"Failed to add task to database: {e}", exc_info=True) raise try: # Create markdown file self.markdown_manager.create_task_file(task_data) logger.info(f"Markdown file created for task {task_id}") except Exception as e: logger.error(f"Failed to create markdown file for task {task_id}: {e}", exc_info=True) # Continue even if markdown fails - we already have the data in the database try: # Update views self._update_all_views() logger.info("Task views updated") except Exception as e: logger.error(f"Failed to update task views: {e}", exc_info=True) # Continue even if views update fails return {"id": task_id, "message": "Task created successfully"} except Exception as e: logger.error(f"Error adding task: {e}", exc_info=True) return {"error": str(e)} def get_all_tasks(self, include_completed: bool = False, parent_id: Optional[str] = None, tags: List[str] = None) -> List[Dict[str, Any]]: """Get all tasks with optional filtering and their subtasks.""" # Get tasks from database tasks = self.repository.get_todos(include_completed, parent_id, tags) # Only add subtasks for top-level tasks if parent_id is None: for task in tasks: self._recursively_add_subtasks(task) return tasks def get_task_by_id(self, task_id: str) -> Dict[str, Any]: """Get a specific task by ID with its subtasks.""" task = self.repository.get_todo_by_id(task_id) if not task: return {"error": "Task not found"} # Add subtasks self._recursively_add_subtasks(task) return task def update_task_by_id(self, task_id: str, updates: Dict[str, Any]) -> Dict[str, Any]: """Update a task and return success status.""" logger.info(f"Updating task {task_id} with: {updates}") try: # Check if task exists existing_task = self.repository.get_todo_by_id(task_id) if not existing_task: logger.warning(f"Attempted to update non-existent task: {task_id}") return {"error": "Task not found"} # Validate fibonacci sequence for effort valid_efforts = [1, 2, 3, 5, 8, 13, 21] if 'effort' in updates and updates['effort'] not in valid_efforts: error_msg = f"Effort must be a Fibonacci number from {valid_efforts}" logger.error(f"Invalid effort value {updates['effort']} for task {task_id}") raise ValueError(error_msg) # Validate urgency if 'urgency' in updates and not 1 <= updates['urgency'] <= 5: error_msg = "Urgency must be between 1 and 5" logger.error(f"Invalid urgency value {updates['urgency']} for task {task_id}") raise ValueError(error_msg) try: # Update in database success = self.repository.update_todo(task_id, updates) if not success: logger.error(f"Database update failed for task {task_id}") return {"error": "Failed to update task in database"} logger.info(f"Task {task_id} updated in database") except Exception as e: logger.error(f"Error updating task {task_id} in database: {e}", exc_info=True) raise try: # Get updated task updated_task = self.repository.get_todo_by_id(task_id) if not updated_task: logger.error(f"Could not retrieve updated task {task_id}") return {"error": "Failed to retrieve updated task"} # Update markdown file self.markdown_manager.update_task_file(updated_task) logger.info(f"Markdown file updated for task {task_id}") except Exception as e: logger.error(f"Error updating markdown for task {task_id}: {e}", exc_info=True) # Continue even if markdown update fails try: # Update views self._update_all_views() logger.info("Task views updated after task update") except Exception as e: logger.error(f"Error updating views after task update: {e}", exc_info=True) # Continue even if views update fails return {"message": "Task updated successfully"} except ValueError as e: # Handle validation errors logger.error(f"Validation error updating task {task_id}: {e}", exc_info=True) return {"error": str(e)} except Exception as e: # Handle other errors logger.error(f"Unexpected error updating task {task_id}: {e}", exc_info=True) return {"error": f"Failed to update task: {str(e)}"} def delete_task_by_id(self, task_id: str, delete_subtasks: bool = True) -> Dict[str, Any]: """Delete a task and return success status.""" logger.info(f"Deleting task {task_id} (with subtasks: {delete_subtasks})") try: # Check if task exists existing_task = self.repository.get_todo_by_id(task_id) if not existing_task: logger.warning(f"Attempted to delete non-existent task: {task_id}") return {"error": "Task not found"} try: # Delete from database success = self.repository.delete_todo(task_id, delete_subtasks) if not success: logger.error(f"Database deletion failed for task {task_id}") return {"error": "Failed to delete task from database"} logger.info(f"Task {task_id} deleted from database") except Exception as e: logger.error(f"Error deleting task {task_id} from database: {e}", exc_info=True) raise try: # Delete markdown file self.markdown_manager.delete_task_file(task_id) logger.info(f"Markdown file deleted for task {task_id}") except Exception as e: logger.error(f"Error deleting markdown for task {task_id}: {e}", exc_info=True) # Continue even if markdown deletion fails try: # Update views self._update_all_views() logger.info("Task views updated after task deletion") except Exception as e: logger.error(f"Error updating views after task deletion: {e}", exc_info=True) # Continue even if views update fails return {"message": "Task deleted successfully"} except Exception as e: # Handle other errors logger.error(f"Unexpected error deleting task {task_id}: {e}", exc_info=True) return {"error": f"Failed to delete task: {str(e)}"} def create_subtasks(self, task_id: str, subtasks: List[Dict[str, Any]]) -> Dict[str, Any]: """Create subtasks for a task and return their IDs.""" # Check if parent task exists parent_task = self.repository.get_todo_by_id(task_id) if not parent_task: return {"error": "Parent task not found"} subtask_ids = [] # Create each subtask for subtask in subtasks: subtask['parent_id'] = task_id subtask_id = self._ensure_id(subtask) self.add_task(subtask) subtask_ids.append(subtask_id) return { "message": f"Created {len(subtask_ids)} subtasks", "subtask_ids": subtask_ids } def search(self, query: str) -> List[Dict[str, Any]]: """Search tasks and return matches.""" results = self.repository.search_todos(query) # Add subtasks for task in results: self._recursively_add_subtasks(task) return results def get_all_tags(self) -> List[str]: """Get all unique tag names.""" return self.repository.get_all_tags() def get_statistics(self) -> Dict[str, Any]: """Get task statistics.""" return self.repository.get_statistics() def sync_from_obsidian(self) -> Dict[str, Any]: """Sync changes from Obsidian markdown files back to the database.""" logger.info("Starting sync from Obsidian to database") try: # Get all tasks from markdown files markdown_tasks = self.markdown_manager.sync_from_markdown() if not markdown_tasks: logger.info("No markdown tasks found for syncing") return {"message": "No tasks found for syncing", "count": 0} # Track statistics updated_count = 0 error_count = 0 # Process each task for task_data in markdown_tasks: try: task_id = task_data["id"] # Check if task exists existing_task = self.repository.get_todo_by_id(task_id) if existing_task: # Update existing task success = self.repository.update_todo(task_id, task_data) if success: logger.info(f"Updated task {task_id} from markdown") updated_count += 1 else: logger.error(f"Failed to update task {task_id} from markdown") error_count += 1 else: # Add new task self.repository.add_todo(task_data) logger.info(f"Added new task {task_id} from markdown") updated_count += 1 except Exception as e: logger.error(f"Error syncing task {task_data.get('id', 'unknown')}: {e}", exc_info=True) error_count += 1 # Update all views to reflect changes try: self._update_all_views() except Exception as e: logger.error(f"Error updating views after sync: {e}", exc_info=True) result = { "message": "Sync completed", "updated": updated_count, "errors": error_count, "total": len(markdown_tasks) } logger.info(f"Sync completed: {result}") return result except Exception as e: logger.error(f"Error in Obsidian sync process: {e}", exc_info=True) return {"error": f"Sync failed: {str(e)}"} def reset(self) -> Dict[str, Any]: """Reset both the database and markdown files (for testing/development).""" logger.warning("Resetting entire task system") try: # Reset database by recreating tables Base.metadata.drop_all(self.repository.engine) Base.metadata.create_all(self.repository.engine) logger.info("Database reset complete") # Reset markdown files if os.path.exists(self.markdown_manager.vault_path): shutil.rmtree(self.markdown_manager.vault_path) logger.info(f"Removed vault directory: {self.markdown_manager.vault_path}") self.markdown_manager._ensure_vault_exists() logger.info("Created new vault directory") return {"message": "Task system reset successfully"} except Exception as e: logger.error(f"Error resetting task system: {e}", exc_info=True) return {"error": f"Reset failed: {str(e)}"}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brysontang/DeltaTask'

If you have feedback or need assistance with the MCP directory API, please join our Discord server