update_task
Modify existing tasks in DeltaTask MCP Server by providing task ID and update details to change task properties, status, or organization.
Instructions
Update an existing task.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| task_id | Yes | ||
| updates | Yes |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- server.py:47-50 (handler)The primary MCP tool handler for 'update_task'. This async function is decorated with @mcp.tool() for automatic registration and executes the tool logic by delegating to TaskService.update_task_by_id.
@mcp.tool() async def update_task(task_id: str, updates: dict[str, Any]) -> dict[str, Any]: """Update an existing task.""" return service.update_task_by_id(task_id, updates) - Core handler implementing the update_task logic: validates updates, persists to database via repository, syncs markdown file via ObsidianMarkdownManager.update_task_file, and refreshes all task views.
def update_task_by_id(self, task_id: str, updates: Dict[str, Any]) -> Dict[str, Any]: """Update a task and return success status.""" logger.info(f"Updating task {task_id} with: {updates}") try: # Check if task exists existing_task = self.repository.get_todo_by_id(task_id) if not existing_task: logger.warning(f"Attempted to update non-existent task: {task_id}") return {"error": "Task not found"} # Validate fibonacci sequence for effort valid_efforts = [1, 2, 3, 5, 8, 13, 21] if 'effort' in updates and updates['effort'] not in valid_efforts: error_msg = f"Effort must be a Fibonacci number from {valid_efforts}" logger.error(f"Invalid effort value {updates['effort']} for task {task_id}") raise ValueError(error_msg) # Validate urgency if 'urgency' in updates and not 1 <= updates['urgency'] <= 5: error_msg = "Urgency must be between 1 and 5" logger.error(f"Invalid urgency value {updates['urgency']} for task {task_id}") raise ValueError(error_msg) try: # Update in database success = self.repository.update_todo(task_id, updates) if not success: logger.error(f"Database update failed for task {task_id}") return {"error": "Failed to update task in database"} logger.info(f"Task {task_id} updated in database") except Exception as e: logger.error(f"Error updating task {task_id} in database: {e}", exc_info=True) raise try: # Get updated task updated_task = self.repository.get_todo_by_id(task_id) if not updated_task: logger.error(f"Could not retrieve updated task {task_id}") return {"error": "Failed to retrieve updated task"} # Update markdown file self.markdown_manager.update_task_file(updated_task) logger.info(f"Markdown file updated for task {task_id}") except Exception as e: logger.error(f"Error updating markdown for task {task_id}: {e}", exc_info=True) # Continue even if markdown update fails try: # Update views self._update_all_views() logger.info("Task views updated after task update") except Exception as e: logger.error(f"Error updating views after task update: {e}", exc_info=True) # Continue even if views update fails return {"message": "Task updated successfully"} except ValueError as e: # Handle validation errors logger.error(f"Validation error updating task {task_id}: {e}", exc_info=True) return {"error": str(e)} except Exception as e: # Handle other errors logger.error(f"Unexpected error updating task {task_id}: {e}", exc_info=True) return {"error": f"Failed to update task: {str(e)}"} - Helper function called during task update to synchronize the task's individual markdown file with updated data, handling frontmatter, content, links, and tag updates.
def update_task_file(self, task: Dict[str, Any]) -> None: """Update a task markdown file.""" # First try with the new format (ID - title) sanitized_title = self._sanitize_filename(task["title"]) task_file = os.path.join(self.vault_path, "tasks", f"{task['id']} - {sanitized_title}.md") # If not found, try the old format (just ID) if not os.path.exists(task_file): old_format_file = os.path.join(self.vault_path, "tasks", f"{task['id']}.md") if os.path.exists(old_format_file): logger.info(f"Found task file in old format, using: {old_format_file}") task_file = old_format_file else: # If file doesn't exist in either format, create it logger.info(f"Task file {task['id']} not found, creating new file") self.create_task_file(task) return try: post = frontmatter.load(task_file) # Update frontmatter fields old_title = post.get("title", "") new_title = task["title"] post["title"] = new_title post["updated"] = task.get("updated", datetime.now().isoformat()) post["urgency"] = task.get("urgency", post.get("urgency", 1)) post["effort"] = task.get("effort", post.get("effort", 1)) post["completed"] = task.get("completed", post.get("completed", False)) if "deadline" in task: post["deadline"] = task["deadline"] elif "deadline" in post and task.get("deadline") is None: del post["deadline"] # If title has changed, update links in all child tasks if old_title != new_title: self._update_child_parent_links(task["id"], new_title) # Handle description separately if "description" in task: # Preserve the subtasks and related sections sections = post.content.split("## Subtasks") if len(sections) >= 2: post.content = task["description"] + "\n\n## Subtasks" + sections[1] else: post.content = task["description"] + "\n\n## Subtasks\n\n\n\n## Related\n\n" logger.warning(f"Couldn't find Subtasks section in {task['id']}, recreating structure") # Check for tags update to update tag files old_tags = post.get('tags', []) new_tags = task.get('tags', old_tags) # Update tags in frontmatter if "tags" in task: post["tags"] = task["tags"] try: # Write back to file with open(task_file, "wb") as f: frontmatter.dump(post, f) logger.info(f"Updated task file {task['id']}") except IOError as e: logger.error(f"Error writing to task file {task_file}: {e}", exc_info=True) raise # Update tag files if tags changed if new_tags != old_tags: logger.info(f"Tags changed for task {task['id']}, updating tag files") # Remove from old tags for tag in old_tags: if tag not in new_tags: self._remove_task_from_tag(tag, task["id"]) # Add to new tags for tag in new_tags: if tag not in old_tags: self._update_tag_files([tag], task["id"], task["title"]) except frontmatter.FrontmatterError as e: logger.error(f"Frontmatter error for task {task['id']}: {e}", exc_info=True) # Attempt recovery by recreating the file logger.info(f"Attempting to recreate task file {task['id']}") self.create_task_file(task) except Exception as e: logger.error(f"Error updating task file {task['id']}: {e}", exc_info=True) raise - Helper function that regenerates aggregated markdown views (all, urgent, today, overdue) after task updates to reflect current state.
def update_task_views(self, tasks: List[Dict[str, Any]]) -> None: """Update the task view files based on current tasks.""" logger.info("Updating task view files") try: # All tasks view all_tasks_path = os.path.join(self.vault_path, "tasks", "all.md") with open(all_tasks_path, "w") as f: f.write("# All Tasks\n\n") if tasks: for task in tasks: completed = "✅ " if task.get('completed', False) else "" deadline = f" (Due: {task.get('deadline', 'No deadline')})" if 'deadline' in task else "" sanitized_title = self._sanitize_filename(task['title']) f.write(f"- {completed}[[tasks/{task['id']} - {sanitized_title}]]{deadline}\n") else: f.write("No tasks found.\n") logger.info("Updated All Tasks view") except IOError as e: logger.error(f"Error updating All Tasks view: {e}", exc_info=True) try: # Urgent tasks view urgent_tasks_path = os.path.join(self.vault_path, "tasks", "urgent.md") with open(urgent_tasks_path, "w") as f: f.write("# Urgent Tasks\n\n") urgent_tasks = [t for t in tasks if not t.get('completed', False) and t.get('urgency', 1) >= 4] if urgent_tasks: for task in urgent_tasks: urgency = "🔥" * task.get('urgency', 1) deadline = f" (Due: {task.get('deadline', 'No deadline')})" if 'deadline' in task else "" sanitized_title = self._sanitize_filename(task['title']) f.write(f"- {urgency} [[tasks/{task['id']} - {sanitized_title}]]{deadline}\n") else: f.write("No urgent tasks found.\n") logger.info(f"Updated Urgent Tasks view with {len(urgent_tasks) if 'urgent_tasks' in locals() else 0} tasks") except IOError as e: logger.error(f"Error updating Urgent Tasks view: {e}", exc_info=True) try: # Today's tasks today_tasks_path = os.path.join(self.vault_path, "tasks", "today.md") with open(today_tasks_path, "w") as f: f.write("# Due Today\n\n") today = datetime.now().date().isoformat() today_tasks = [t for t in tasks if not t.get('completed', False) and t.get('deadline') == today] if today_tasks: for task in today_tasks: urgency = "🔥" * task.get('urgency', 1) sanitized_title = self._sanitize_filename(task['title']) f.write(f"- {urgency} [[tasks/{task['id']} - {sanitized_title}]]\n") else: f.write("No tasks due today.\n") logger.info(f"Updated Today's Tasks view with {len(today_tasks) if 'today_tasks' in locals() else 0} tasks") except IOError as e: logger.error(f"Error updating Today's Tasks view: {e}", exc_info=True) try: # Overdue tasks overdue_tasks_path = os.path.join(self.vault_path, "tasks", "overdue.md") with open(overdue_tasks_path, "w") as f: f.write("# Overdue Tasks\n\n") today = datetime.now().date().isoformat() overdue_tasks = [t for t in tasks if not t.get('completed', False) and t.get('deadline') and t.get('deadline') < today] if overdue_tasks: for task in overdue_tasks: urgency = "🔥" * task.get('urgency', 1) deadline = f" (Due: {task.get('deadline')})" sanitized_title = self._sanitize_filename(task['title']) f.write(f"- {urgency} [[tasks/{task['id']} - {sanitized_title}]]{deadline}\n") else: f.write("No overdue tasks.\n") logger.info(f"Updated Overdue Tasks view with {len(overdue_tasks) if 'overdue_tasks' in locals() else 0} tasks") except IOError as e: logger.error(f"Error updating Overdue Tasks view: {e}", exc_info=True) except Exception as e: logger.error(f"Unexpected error updating task views: {e}", exc_info=True) def create_statistics_file(self, stats: Dict[str, Any]) -> None: