update_task
Modify existing tasks in DeltaTask MCP Server by providing task ID and update details to change task properties, status, or organization.
Instructions
Update an existing task.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| task_id | Yes | ||
| updates | Yes |
Implementation Reference
- server.py:47-50 (handler)The primary MCP tool handler for 'update_task'. This async function is decorated with @mcp.tool() for automatic registration and executes the tool logic by delegating to TaskService.update_task_by_id.@mcp.tool() async def update_task(task_id: str, updates: dict[str, Any]) -> dict[str, Any]: """Update an existing task.""" return service.update_task_by_id(task_id, updates)
- Core handler implementing the update_task logic: validates updates, persists to database via repository, syncs markdown file via ObsidianMarkdownManager.update_task_file, and refreshes all task views.def update_task_by_id(self, task_id: str, updates: Dict[str, Any]) -> Dict[str, Any]: """Update a task and return success status.""" logger.info(f"Updating task {task_id} with: {updates}") try: # Check if task exists existing_task = self.repository.get_todo_by_id(task_id) if not existing_task: logger.warning(f"Attempted to update non-existent task: {task_id}") return {"error": "Task not found"} # Validate fibonacci sequence for effort valid_efforts = [1, 2, 3, 5, 8, 13, 21] if 'effort' in updates and updates['effort'] not in valid_efforts: error_msg = f"Effort must be a Fibonacci number from {valid_efforts}" logger.error(f"Invalid effort value {updates['effort']} for task {task_id}") raise ValueError(error_msg) # Validate urgency if 'urgency' in updates and not 1 <= updates['urgency'] <= 5: error_msg = "Urgency must be between 1 and 5" logger.error(f"Invalid urgency value {updates['urgency']} for task {task_id}") raise ValueError(error_msg) try: # Update in database success = self.repository.update_todo(task_id, updates) if not success: logger.error(f"Database update failed for task {task_id}") return {"error": "Failed to update task in database"} logger.info(f"Task {task_id} updated in database") except Exception as e: logger.error(f"Error updating task {task_id} in database: {e}", exc_info=True) raise try: # Get updated task updated_task = self.repository.get_todo_by_id(task_id) if not updated_task: logger.error(f"Could not retrieve updated task {task_id}") return {"error": "Failed to retrieve updated task"} # Update markdown file self.markdown_manager.update_task_file(updated_task) logger.info(f"Markdown file updated for task {task_id}") except Exception as e: logger.error(f"Error updating markdown for task {task_id}: {e}", exc_info=True) # Continue even if markdown update fails try: # Update views self._update_all_views() logger.info("Task views updated after task update") except Exception as e: logger.error(f"Error updating views after task update: {e}", exc_info=True) # Continue even if views update fails return {"message": "Task updated successfully"} except ValueError as e: # Handle validation errors logger.error(f"Validation error updating task {task_id}: {e}", exc_info=True) return {"error": str(e)} except Exception as e: # Handle other errors logger.error(f"Unexpected error updating task {task_id}: {e}", exc_info=True) return {"error": f"Failed to update task: {str(e)}"}
- Helper function called during task update to synchronize the task's individual markdown file with updated data, handling frontmatter, content, links, and tag updates.def update_task_file(self, task: Dict[str, Any]) -> None: """Update a task markdown file.""" # First try with the new format (ID - title) sanitized_title = self._sanitize_filename(task["title"]) task_file = os.path.join(self.vault_path, "tasks", f"{task['id']} - {sanitized_title}.md") # If not found, try the old format (just ID) if not os.path.exists(task_file): old_format_file = os.path.join(self.vault_path, "tasks", f"{task['id']}.md") if os.path.exists(old_format_file): logger.info(f"Found task file in old format, using: {old_format_file}") task_file = old_format_file else: # If file doesn't exist in either format, create it logger.info(f"Task file {task['id']} not found, creating new file") self.create_task_file(task) return try: post = frontmatter.load(task_file) # Update frontmatter fields old_title = post.get("title", "") new_title = task["title"] post["title"] = new_title post["updated"] = task.get("updated", datetime.now().isoformat()) post["urgency"] = task.get("urgency", post.get("urgency", 1)) post["effort"] = task.get("effort", post.get("effort", 1)) post["completed"] = task.get("completed", post.get("completed", False)) if "deadline" in task: post["deadline"] = task["deadline"] elif "deadline" in post and task.get("deadline") is None: del post["deadline"] # If title has changed, update links in all child tasks if old_title != new_title: self._update_child_parent_links(task["id"], new_title) # Handle description separately if "description" in task: # Preserve the subtasks and related sections sections = post.content.split("## Subtasks") if len(sections) >= 2: post.content = task["description"] + "\n\n## Subtasks" + sections[1] else: post.content = task["description"] + "\n\n## Subtasks\n\n\n\n## Related\n\n" logger.warning(f"Couldn't find Subtasks section in {task['id']}, recreating structure") # Check for tags update to update tag files old_tags = post.get('tags', []) new_tags = task.get('tags', old_tags) # Update tags in frontmatter if "tags" in task: post["tags"] = task["tags"] try: # Write back to file with open(task_file, "wb") as f: frontmatter.dump(post, f) logger.info(f"Updated task file {task['id']}") except IOError as e: logger.error(f"Error writing to task file {task_file}: {e}", exc_info=True) raise # Update tag files if tags changed if new_tags != old_tags: logger.info(f"Tags changed for task {task['id']}, updating tag files") # Remove from old tags for tag in old_tags: if tag not in new_tags: self._remove_task_from_tag(tag, task["id"]) # Add to new tags for tag in new_tags: if tag not in old_tags: self._update_tag_files([tag], task["id"], task["title"]) except frontmatter.FrontmatterError as e: logger.error(f"Frontmatter error for task {task['id']}: {e}", exc_info=True) # Attempt recovery by recreating the file logger.info(f"Attempting to recreate task file {task['id']}") self.create_task_file(task) except Exception as e: logger.error(f"Error updating task file {task['id']}: {e}", exc_info=True) raise
- Helper function that regenerates aggregated markdown views (all, urgent, today, overdue) after task updates to reflect current state.def update_task_views(self, tasks: List[Dict[str, Any]]) -> None: """Update the task view files based on current tasks.""" logger.info("Updating task view files") try: # All tasks view all_tasks_path = os.path.join(self.vault_path, "tasks", "all.md") with open(all_tasks_path, "w") as f: f.write("# All Tasks\n\n") if tasks: for task in tasks: completed = "✅ " if task.get('completed', False) else "" deadline = f" (Due: {task.get('deadline', 'No deadline')})" if 'deadline' in task else "" sanitized_title = self._sanitize_filename(task['title']) f.write(f"- {completed}[[tasks/{task['id']} - {sanitized_title}]]{deadline}\n") else: f.write("No tasks found.\n") logger.info("Updated All Tasks view") except IOError as e: logger.error(f"Error updating All Tasks view: {e}", exc_info=True) try: # Urgent tasks view urgent_tasks_path = os.path.join(self.vault_path, "tasks", "urgent.md") with open(urgent_tasks_path, "w") as f: f.write("# Urgent Tasks\n\n") urgent_tasks = [t for t in tasks if not t.get('completed', False) and t.get('urgency', 1) >= 4] if urgent_tasks: for task in urgent_tasks: urgency = "🔥" * task.get('urgency', 1) deadline = f" (Due: {task.get('deadline', 'No deadline')})" if 'deadline' in task else "" sanitized_title = self._sanitize_filename(task['title']) f.write(f"- {urgency} [[tasks/{task['id']} - {sanitized_title}]]{deadline}\n") else: f.write("No urgent tasks found.\n") logger.info(f"Updated Urgent Tasks view with {len(urgent_tasks) if 'urgent_tasks' in locals() else 0} tasks") except IOError as e: logger.error(f"Error updating Urgent Tasks view: {e}", exc_info=True) try: # Today's tasks today_tasks_path = os.path.join(self.vault_path, "tasks", "today.md") with open(today_tasks_path, "w") as f: f.write("# Due Today\n\n") today = datetime.now().date().isoformat() today_tasks = [t for t in tasks if not t.get('completed', False) and t.get('deadline') == today] if today_tasks: for task in today_tasks: urgency = "🔥" * task.get('urgency', 1) sanitized_title = self._sanitize_filename(task['title']) f.write(f"- {urgency} [[tasks/{task['id']} - {sanitized_title}]]\n") else: f.write("No tasks due today.\n") logger.info(f"Updated Today's Tasks view with {len(today_tasks) if 'today_tasks' in locals() else 0} tasks") except IOError as e: logger.error(f"Error updating Today's Tasks view: {e}", exc_info=True) try: # Overdue tasks overdue_tasks_path = os.path.join(self.vault_path, "tasks", "overdue.md") with open(overdue_tasks_path, "w") as f: f.write("# Overdue Tasks\n\n") today = datetime.now().date().isoformat() overdue_tasks = [t for t in tasks if not t.get('completed', False) and t.get('deadline') and t.get('deadline') < today] if overdue_tasks: for task in overdue_tasks: urgency = "🔥" * task.get('urgency', 1) deadline = f" (Due: {task.get('deadline')})" sanitized_title = self._sanitize_filename(task['title']) f.write(f"- {urgency} [[tasks/{task['id']} - {sanitized_title}]]{deadline}\n") else: f.write("No overdue tasks.\n") logger.info(f"Updated Overdue Tasks view with {len(overdue_tasks) if 'overdue_tasks' in locals() else 0} tasks") except IOError as e: logger.error(f"Error updating Overdue Tasks view: {e}", exc_info=True) except Exception as e: logger.error(f"Unexpected error updating task views: {e}", exc_info=True) def create_statistics_file(self, stats: Dict[str, Any]) -> None: