Skip to main content
Glama
brysontang

DeltaTask MCP Server

by brysontang

update_task

Modify existing tasks in DeltaTask MCP Server by providing task ID and update details to change task properties, status, or organization.

Instructions

Update an existing task.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
task_idYes
updatesYes

Implementation Reference

  • server.py:47-50 (handler)
    The primary MCP tool handler for 'update_task'. This async function is decorated with @mcp.tool() for automatic registration and executes the tool logic by delegating to TaskService.update_task_by_id.
    @mcp.tool()
    async def update_task(task_id: str, updates: dict[str, Any]) -> dict[str, Any]:
        """Update an existing task."""
        return service.update_task_by_id(task_id, updates)
  • Core handler implementing the update_task logic: validates updates, persists to database via repository, syncs markdown file via ObsidianMarkdownManager.update_task_file, and refreshes all task views.
    def update_task_by_id(self, task_id: str, updates: Dict[str, Any]) -> Dict[str, Any]:
        """Update a task and return success status."""
        logger.info(f"Updating task {task_id} with: {updates}")
        
        try:
            # Check if task exists
            existing_task = self.repository.get_todo_by_id(task_id)
            if not existing_task:
                logger.warning(f"Attempted to update non-existent task: {task_id}")
                return {"error": "Task not found"}
                
            # Validate fibonacci sequence for effort
            valid_efforts = [1, 2, 3, 5, 8, 13, 21]
            if 'effort' in updates and updates['effort'] not in valid_efforts:
                error_msg = f"Effort must be a Fibonacci number from {valid_efforts}"
                logger.error(f"Invalid effort value {updates['effort']} for task {task_id}")
                raise ValueError(error_msg)
            
            # Validate urgency
            if 'urgency' in updates and not 1 <= updates['urgency'] <= 5:
                error_msg = "Urgency must be between 1 and 5"
                logger.error(f"Invalid urgency value {updates['urgency']} for task {task_id}")
                raise ValueError(error_msg)
            
            try:
                # Update in database
                success = self.repository.update_todo(task_id, updates)
                
                if not success:
                    logger.error(f"Database update failed for task {task_id}")
                    return {"error": "Failed to update task in database"}
                
                logger.info(f"Task {task_id} updated in database")
            except Exception as e:
                logger.error(f"Error updating task {task_id} in database: {e}", exc_info=True)
                raise
            
            try:
                # Get updated task
                updated_task = self.repository.get_todo_by_id(task_id)
                if not updated_task:
                    logger.error(f"Could not retrieve updated task {task_id}")
                    return {"error": "Failed to retrieve updated task"}
                
                # Update markdown file
                self.markdown_manager.update_task_file(updated_task)
                logger.info(f"Markdown file updated for task {task_id}")
            except Exception as e:
                logger.error(f"Error updating markdown for task {task_id}: {e}", exc_info=True)
                # Continue even if markdown update fails
            
            try:
                # Update views
                self._update_all_views()
                logger.info("Task views updated after task update")
            except Exception as e:
                logger.error(f"Error updating views after task update: {e}", exc_info=True)
                # Continue even if views update fails
            
            return {"message": "Task updated successfully"}
            
        except ValueError as e:
            # Handle validation errors
            logger.error(f"Validation error updating task {task_id}: {e}", exc_info=True)
            return {"error": str(e)}
        except Exception as e:
            # Handle other errors
            logger.error(f"Unexpected error updating task {task_id}: {e}", exc_info=True)
            return {"error": f"Failed to update task: {str(e)}"}
  • Helper function called during task update to synchronize the task's individual markdown file with updated data, handling frontmatter, content, links, and tag updates.
    def update_task_file(self, task: Dict[str, Any]) -> None:
        """Update a task markdown file."""
        # First try with the new format (ID - title)
        sanitized_title = self._sanitize_filename(task["title"]) 
        task_file = os.path.join(self.vault_path, "tasks", f"{task['id']} - {sanitized_title}.md")
        
        # If not found, try the old format (just ID)
        if not os.path.exists(task_file):
            old_format_file = os.path.join(self.vault_path, "tasks", f"{task['id']}.md")
            if os.path.exists(old_format_file):
                logger.info(f"Found task file in old format, using: {old_format_file}")
                task_file = old_format_file
            else:
                # If file doesn't exist in either format, create it
                logger.info(f"Task file {task['id']} not found, creating new file")
                self.create_task_file(task)
                return
            
        try:
            post = frontmatter.load(task_file)
            
            # Update frontmatter fields
            old_title = post.get("title", "")
            new_title = task["title"]
            post["title"] = new_title
            post["updated"] = task.get("updated", datetime.now().isoformat())
            post["urgency"] = task.get("urgency", post.get("urgency", 1))
            post["effort"] = task.get("effort", post.get("effort", 1))
            post["completed"] = task.get("completed", post.get("completed", False))
            
            if "deadline" in task:
                post["deadline"] = task["deadline"]
            elif "deadline" in post and task.get("deadline") is None:
                del post["deadline"]
                
            # If title has changed, update links in all child tasks
            if old_title != new_title:
                self._update_child_parent_links(task["id"], new_title)
                
            # Handle description separately
            if "description" in task:
                # Preserve the subtasks and related sections
                sections = post.content.split("## Subtasks")
                if len(sections) >= 2:
                    post.content = task["description"] + "\n\n## Subtasks" + sections[1]
                else:
                    post.content = task["description"] + "\n\n## Subtasks\n\n\n\n## Related\n\n"
                    logger.warning(f"Couldn't find Subtasks section in {task['id']}, recreating structure")
            
            # Check for tags update to update tag files
            old_tags = post.get('tags', [])
            new_tags = task.get('tags', old_tags)
            
            # Update tags in frontmatter
            if "tags" in task:
                post["tags"] = task["tags"]
            
            try:
                # Write back to file
                with open(task_file, "wb") as f:
                    frontmatter.dump(post, f)
                logger.info(f"Updated task file {task['id']}")
            except IOError as e:
                logger.error(f"Error writing to task file {task_file}: {e}", exc_info=True)
                raise
            
            # Update tag files if tags changed
            if new_tags != old_tags:
                logger.info(f"Tags changed for task {task['id']}, updating tag files")
                # Remove from old tags
                for tag in old_tags:
                    if tag not in new_tags:
                        self._remove_task_from_tag(tag, task["id"])
                
                # Add to new tags
                for tag in new_tags:
                    if tag not in old_tags:
                        self._update_tag_files([tag], task["id"], task["title"])
                        
        except frontmatter.FrontmatterError as e:
            logger.error(f"Frontmatter error for task {task['id']}: {e}", exc_info=True)
            # Attempt recovery by recreating the file
            logger.info(f"Attempting to recreate task file {task['id']}")
            self.create_task_file(task)
        except Exception as e:
            logger.error(f"Error updating task file {task['id']}: {e}", exc_info=True)
            raise
  • Helper function that regenerates aggregated markdown views (all, urgent, today, overdue) after task updates to reflect current state.
    def update_task_views(self, tasks: List[Dict[str, Any]]) -> None:
        """Update the task view files based on current tasks."""
        logger.info("Updating task view files")
        
        try:
            # All tasks view
            all_tasks_path = os.path.join(self.vault_path, "tasks", "all.md")
            with open(all_tasks_path, "w") as f:
                f.write("# All Tasks\n\n")
                
                if tasks:
                    for task in tasks:
                        completed = "✅ " if task.get('completed', False) else ""
                        deadline = f" (Due: {task.get('deadline', 'No deadline')})" if 'deadline' in task else ""
                        sanitized_title = self._sanitize_filename(task['title'])
                        f.write(f"- {completed}[[tasks/{task['id']} - {sanitized_title}]]{deadline}\n")
                else:
                    f.write("No tasks found.\n")
            logger.info("Updated All Tasks view")
        except IOError as e:
            logger.error(f"Error updating All Tasks view: {e}", exc_info=True)
        
        try:
            # Urgent tasks view
            urgent_tasks_path = os.path.join(self.vault_path, "tasks", "urgent.md")
            with open(urgent_tasks_path, "w") as f:
                f.write("# Urgent Tasks\n\n")
                urgent_tasks = [t for t in tasks if not t.get('completed', False) and t.get('urgency', 1) >= 4]
                
                if urgent_tasks:
                    for task in urgent_tasks:
                        urgency = "🔥" * task.get('urgency', 1)
                        deadline = f" (Due: {task.get('deadline', 'No deadline')})" if 'deadline' in task else ""
                        sanitized_title = self._sanitize_filename(task['title'])
                        f.write(f"- {urgency} [[tasks/{task['id']} - {sanitized_title}]]{deadline}\n")
                else:
                    f.write("No urgent tasks found.\n")
            logger.info(f"Updated Urgent Tasks view with {len(urgent_tasks) if 'urgent_tasks' in locals() else 0} tasks")
        except IOError as e:
            logger.error(f"Error updating Urgent Tasks view: {e}", exc_info=True)
        
        try:
            # Today's tasks
            today_tasks_path = os.path.join(self.vault_path, "tasks", "today.md")
            with open(today_tasks_path, "w") as f:
                f.write("# Due Today\n\n")
                today = datetime.now().date().isoformat()
                today_tasks = [t for t in tasks if not t.get('completed', False) and t.get('deadline') == today]
                
                if today_tasks:
                    for task in today_tasks:
                        urgency = "🔥" * task.get('urgency', 1)
                        sanitized_title = self._sanitize_filename(task['title'])
                        f.write(f"- {urgency} [[tasks/{task['id']} - {sanitized_title}]]\n")
                else:
                    f.write("No tasks due today.\n")
            logger.info(f"Updated Today's Tasks view with {len(today_tasks) if 'today_tasks' in locals() else 0} tasks")
        except IOError as e:
            logger.error(f"Error updating Today's Tasks view: {e}", exc_info=True)
        
        try:
            # Overdue tasks
            overdue_tasks_path = os.path.join(self.vault_path, "tasks", "overdue.md")
            with open(overdue_tasks_path, "w") as f:
                f.write("# Overdue Tasks\n\n")
                today = datetime.now().date().isoformat()
                overdue_tasks = [t for t in tasks if not t.get('completed', False) 
                               and t.get('deadline') and t.get('deadline') < today]
                
                if overdue_tasks:
                    for task in overdue_tasks:
                        urgency = "🔥" * task.get('urgency', 1)
                        deadline = f" (Due: {task.get('deadline')})"
                        sanitized_title = self._sanitize_filename(task['title'])
                        f.write(f"- {urgency} [[tasks/{task['id']} - {sanitized_title}]]{deadline}\n")
                else:
                    f.write("No overdue tasks.\n")
            logger.info(f"Updated Overdue Tasks view with {len(overdue_tasks) if 'overdue_tasks' in locals() else 0} tasks")
        except IOError as e:
            logger.error(f"Error updating Overdue Tasks view: {e}", exc_info=True)
        except Exception as e:
            logger.error(f"Unexpected error updating task views: {e}", exc_info=True)
    def create_statistics_file(self, stats: Dict[str, Any]) -> None:

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brysontang/DeltaTask'

If you have feedback or need assistance with the MCP directory API, please join our Discord server