task_manager.py•17.9 kB
# Aidderall MCP Server - Hierarchical task management for AI assistants
# Copyright (C) 2024 Briam R. <briamr@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from .models import MainTask, SubTask, Task, TaskStatus
class TaskManager:
def __init__(self) -> None:
self.global_tasks: List[MainTask] = []
self.completed_tasks: List[Task] = []
self._manual_current_task: Optional[Task] = None
def _task_to_basic_dict(self, task: Task) -> Dict[str, Any]:
"""Convert task to basic dictionary with id, title, and status."""
return {
"id": task.id,
"title": task.title,
"status": task.status.value,
}
def _task_to_standard_dict(self, task: Task) -> Dict[str, Any]:
"""Convert task to standard dictionary with basic info plus created_at."""
result = self._task_to_basic_dict(task)
result["created_at"] = task.created_at.isoformat()
return result
def _task_to_full_dict(self, task: Task) -> Dict[str, Any]:
"""Convert task to full dictionary with all available information."""
result = self._task_to_standard_dict(task)
result["body"] = task.body
if task.completed_at:
result["completed_at"] = task.completed_at.isoformat()
return result
@property
def current_task(self) -> Optional[Task]:
if not self.global_tasks:
return None
# If we have a manually set current task via switch_focus, return it
if hasattr(self, "_manual_current_task") and self._manual_current_task:
return self._manual_current_task
# Find the task with CURRENT status
for main_task in self.global_tasks:
if main_task.status == TaskStatus.CURRENT:
return main_task
for sub_task in main_task.sub_tasks:
if sub_task.status == TaskStatus.CURRENT:
return sub_task
# If no current task found, return the rightmost task
last_main_task = self.global_tasks[-1]
return last_main_task.get_rightmost_task()
@property
def is_zen_state(self) -> bool:
# Zen state: either no tasks OR all tasks are completed
if len(self.global_tasks) == 0:
return True
# Check if all tasks are completed
for main_task in self.global_tasks:
if main_task.status != TaskStatus.COMPLETED:
return False
for sub_task in main_task.sub_tasks:
if sub_task.status != TaskStatus.COMPLETED:
return False
return True
def create_new_task(self, title: str, body: str) -> MainTask:
new_task = MainTask(title=title, body=body)
new_task.status = TaskStatus.CURRENT
if self.current_task:
self.current_task.status = TaskStatus.PENDING
self.global_tasks.append(new_task)
self._manual_current_task = None # Clear manual focus
return new_task
def extend_current_task(self, title: str, body: str) -> SubTask:
if self.is_zen_state:
raise ValueError(
"Cannot extend task in zen state. Create a new task first."
)
if self.current_task:
self.current_task.status = TaskStatus.PENDING
new_sub_task = SubTask(title=title, body=body)
new_sub_task.status = TaskStatus.CURRENT
last_main_task = self.global_tasks[-1]
last_main_task.add_sub_task(new_sub_task)
self._manual_current_task = None # Clear manual focus
return new_sub_task
def complete_current_task(self) -> Optional[Task]:
current = self.current_task
if not current:
return None
current.status = TaskStatus.COMPLETED
current.completed_at = datetime.now()
self.completed_tasks.append(current)
self._manual_current_task = None # Clear manual focus
# Find the next incomplete task to focus on
# First, look for incomplete siblings to the left
if isinstance(current, SubTask):
# Find which main task contains this subtask
for main_task in self.global_tasks:
if current in main_task.sub_tasks:
# Look for incomplete subtasks to the left
current_index = main_task.sub_tasks.index(current)
for i in range(current_index - 1, -1, -1):
if main_task.sub_tasks[i].status != TaskStatus.COMPLETED:
main_task.sub_tasks[i].status = TaskStatus.CURRENT
return current
# No incomplete siblings, make parent current if incomplete
if main_task.status != TaskStatus.COMPLETED:
main_task.status = TaskStatus.CURRENT
else:
# Parent is also complete, find previous incomplete main task
self._find_and_focus_previous_incomplete_task()
break
else:
# Current is a main task, find previous incomplete main task
self._find_and_focus_previous_incomplete_task()
return current
def _find_and_focus_previous_incomplete_task(self) -> None:
"""Find and focus on the previous incomplete task in the global stack."""
# Look for incomplete main tasks from right to left
for i in range(len(self.global_tasks) - 1, -1, -1):
main_task = self.global_tasks[i]
# Check if this main task has incomplete subtasks
if main_task.sub_tasks:
for j in range(len(main_task.sub_tasks) - 1, -1, -1):
if main_task.sub_tasks[j].status != TaskStatus.COMPLETED:
main_task.sub_tasks[j].status = TaskStatus.CURRENT
return
# Check if the main task itself is incomplete
if main_task.status != TaskStatus.COMPLETED:
main_task.status = TaskStatus.CURRENT
return
# No incomplete tasks found - we're in zen state
def update_current_task(self, body: str) -> Task:
current = self.current_task
if not current:
raise ValueError("No current task to update")
current.body = body
return current
def get_stack_depth(self) -> int:
"""Get the total depth of the current task stack."""
if self.is_zen_state:
return 0
# Count main tasks
depth = len(self.global_tasks)
# Add subtask depth from the last main task
if self.global_tasks:
last_main = self.global_tasks[-1]
depth += len(last_main.sub_tasks)
return depth
def get_siblings_to_left(self) -> List[SubTask]:
if self.is_zen_state or not self.global_tasks:
return []
last_main_task = self.global_tasks[-1]
current = self.current_task
if isinstance(current, MainTask) or not last_main_task.sub_tasks:
return []
return last_main_task.sub_tasks[:-1]
def get_breadcrumb_trail(self) -> List[Task]:
"""Get the ancestor path to current task (excluding siblings)."""
trail: List[Task] = []
# Add all main tasks up to and including the current one
for main_task in self.global_tasks:
trail.append(main_task)
return trail
def get_big_picture(self, format: str = "text") -> Any:
if format == "json":
tasks = []
for main_task in self.global_tasks:
task_data: Dict[str, Any] = {
"id": main_task.id,
"title": main_task.title,
"status": main_task.status.value,
"is_current": main_task == self.current_task,
"created_at": main_task.created_at.isoformat(),
"sub_tasks": [],
}
for sub_task in main_task.sub_tasks:
task_data["sub_tasks"].append(
{
"id": sub_task.id,
"title": sub_task.title,
"status": sub_task.status.value,
"is_current": sub_task == self.current_task,
"created_at": sub_task.created_at.isoformat(),
}
)
tasks.append(task_data)
return {"zen_state": self.is_zen_state, "tasks": tasks}
else: # text format
lines = []
for i, main_task in enumerate(self.global_tasks):
status = main_task.status.value
marker = " <-- YOU ARE HERE" if main_task == self.current_task else ""
lines.append(f"{main_task.title} ({status}){marker}")
for sub_task in main_task.sub_tasks:
status = sub_task.status.value
marker = (
" <-- YOU ARE HERE" if sub_task == self.current_task else ""
)
lines.append(f" {sub_task.title} ({status}){marker}")
if not lines:
return "No tasks (zen state)"
elif self.is_zen_state:
return "\n".join(lines) + "\n\nAll tasks completed (zen state)"
else:
return "\n".join(lines)
def get_completed_tasks(self, order: str = "chronological") -> List[Task]:
if order == "chronological":
return self.completed_tasks.copy()
elif order == "logical":
return list(reversed(self.completed_tasks))
else:
raise ValueError("Order must be 'chronological' or 'logical'")
def get_focus_path(self) -> str:
"""Generate a visual representation of the current focus path."""
if self.is_zen_state:
return "No active tasks (zen state)"
parts = []
current = self.current_task
if not current:
return "No active tasks"
# Find which main task contains the current task
containing_main = None
if isinstance(current, MainTask):
containing_main = current
else:
# Find the main task that contains this subtask
for main_task in self.global_tasks:
if current in main_task.sub_tasks:
containing_main = main_task
break
if containing_main:
parts.append(containing_main.title)
# Add subtasks if current is a subtask
if containing_main.sub_tasks and current in containing_main.sub_tasks:
for sub_task in containing_main.sub_tasks:
parts.append(sub_task.title)
if sub_task == current:
break
# Mark the current task
if parts:
parts[-1] = f"{parts[-1]} ←YOU ARE HERE"
return " → ".join(parts)
def get_commitment_count(self) -> int:
"""Get the number of tasks that must be completed before returning to parent."""
if self.is_zen_state or not self.current_task:
return 0
# If current is a main task, no commitments
if isinstance(self.current_task, MainTask):
return 0
# Count siblings to the right (including current)
siblings_to_left = self.get_siblings_to_left()
return len(siblings_to_left) + 1
def get_stack_overview(self) -> Dict[str, Any]:
return {
"zen_state": self.is_zen_state,
"stack_depth": self.get_stack_depth(),
"current_task_id": self.current_task.id if self.current_task else None,
"global_tasks": [
{
"id": task.id,
"title": task.title,
"status": task.status.value,
"created_at": task.created_at.isoformat(),
"completed_at": (
task.completed_at.isoformat() if task.completed_at else None
),
"sub_tasks": [
{
"id": sub.id,
"title": sub.title,
"status": sub.status.value,
"created_at": sub.created_at.isoformat(),
"completed_at": (
sub.completed_at.isoformat()
if sub.completed_at
else None
),
}
for sub in task.sub_tasks
],
}
for task in self.global_tasks
],
"completed_count": len(self.completed_tasks),
}
def peek_context(self) -> Tuple[Optional[Task], Optional[Task]]:
current = self.current_task
if not current:
return None, None
if isinstance(current, SubTask):
last_main_task = self.global_tasks[-1]
siblings = self.get_siblings_to_left()
immediate_context = siblings[-1] if siblings else None
return last_main_task, immediate_context
else:
if len(self.global_tasks) > 1:
prev_main_task = self.global_tasks[-2]
prev_immediate_context: Task = (
prev_main_task.sub_tasks[-1]
if prev_main_task.sub_tasks
else prev_main_task
)
return prev_main_task, prev_immediate_context
return None, None
def list_siblings(self) -> List[SubTask]:
return self.get_siblings_to_left()
def switch_focus(self, task_id: str) -> Dict[str, Any]:
"""Switch focus to any pending task by ID."""
# Find the task in the structure
target_task: Optional[Task] = None
for main_task in self.global_tasks:
if main_task.id == task_id and main_task.status != TaskStatus.COMPLETED:
target_task = main_task
break
for sub_task in main_task.sub_tasks:
if sub_task.id == task_id and sub_task.status != TaskStatus.COMPLETED:
target_task = sub_task
break
if target_task:
break
if not target_task:
raise ValueError(f"Task with ID '{task_id}' not found or already completed")
# If it's already the current task, nothing to do
if target_task == self.current_task:
return {
"success": True,
"message": "Task is already the current focus",
"task_id": task_id,
}
# Set old current task to PENDING
if self.current_task:
self.current_task.status = TaskStatus.PENDING
# Set new task to CURRENT
target_task.status = TaskStatus.CURRENT
# Set the manual current task
self._manual_current_task = target_task
# Return helpful context about the switch
return {
"success": True,
"message": f"Switched focus to task: {target_task.title}",
"task_id": task_id,
"new_focus_path": self.get_focus_path(),
}
def remove_task(self, task_id: str) -> Dict[str, Any]:
"""Remove a task from the structure (but keep in completed_tasks if it was completed)."""
# Find the task in the structure
removed_task: Optional[Task] = None
# Check main tasks
for i, main_task in enumerate(self.global_tasks):
if main_task.id == task_id:
removed_task = main_task
self.global_tasks.pop(i)
# If this was the current task, clear manual focus
if removed_task == self._manual_current_task:
self._manual_current_task = None
return {
"success": True,
"message": f"Removed task: {removed_task.title}",
"task_id": task_id,
"was_completed": removed_task.status == TaskStatus.COMPLETED,
}
# Check subtasks
for j, sub_task in enumerate(main_task.sub_tasks):
if sub_task.id == task_id:
removed_task = sub_task
main_task.sub_tasks.pop(j)
# If this was the current task, clear manual focus
if removed_task == self._manual_current_task:
self._manual_current_task = None
return {
"success": True,
"message": f"Removed subtask: {removed_task.title}",
"task_id": task_id,
"was_completed": removed_task.status == TaskStatus.COMPLETED,
}
raise ValueError(f"Task with ID '{task_id}' not found in structure")