Skip to main content
Glama

Aidderall MCP Server

by cheezcake
task_manager.py17.9 kB
# Aidderall MCP Server - Hierarchical task management for AI assistants # Copyright (C) 2024 Briam R. <briamr@gmail.com> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. from datetime import datetime from typing import Any, Dict, List, Optional, Tuple from .models import MainTask, SubTask, Task, TaskStatus class TaskManager: def __init__(self) -> None: self.global_tasks: List[MainTask] = [] self.completed_tasks: List[Task] = [] self._manual_current_task: Optional[Task] = None def _task_to_basic_dict(self, task: Task) -> Dict[str, Any]: """Convert task to basic dictionary with id, title, and status.""" return { "id": task.id, "title": task.title, "status": task.status.value, } def _task_to_standard_dict(self, task: Task) -> Dict[str, Any]: """Convert task to standard dictionary with basic info plus created_at.""" result = self._task_to_basic_dict(task) result["created_at"] = task.created_at.isoformat() return result def _task_to_full_dict(self, task: Task) -> Dict[str, Any]: """Convert task to full dictionary with all available information.""" result = self._task_to_standard_dict(task) result["body"] = task.body if task.completed_at: result["completed_at"] = task.completed_at.isoformat() return result @property def current_task(self) -> Optional[Task]: if not self.global_tasks: return None # If we have a manually set current task via switch_focus, return it if hasattr(self, "_manual_current_task") and self._manual_current_task: return self._manual_current_task # Find the task with CURRENT status for main_task in self.global_tasks: if main_task.status == TaskStatus.CURRENT: return main_task for sub_task in main_task.sub_tasks: if sub_task.status == TaskStatus.CURRENT: return sub_task # If no current task found, return the rightmost task last_main_task = self.global_tasks[-1] return last_main_task.get_rightmost_task() @property def is_zen_state(self) -> bool: # Zen state: either no tasks OR all tasks are completed if len(self.global_tasks) == 0: return True # Check if all tasks are completed for main_task in self.global_tasks: if main_task.status != TaskStatus.COMPLETED: return False for sub_task in main_task.sub_tasks: if sub_task.status != TaskStatus.COMPLETED: return False return True def create_new_task(self, title: str, body: str) -> MainTask: new_task = MainTask(title=title, body=body) new_task.status = TaskStatus.CURRENT if self.current_task: self.current_task.status = TaskStatus.PENDING self.global_tasks.append(new_task) self._manual_current_task = None # Clear manual focus return new_task def extend_current_task(self, title: str, body: str) -> SubTask: if self.is_zen_state: raise ValueError( "Cannot extend task in zen state. Create a new task first." ) if self.current_task: self.current_task.status = TaskStatus.PENDING new_sub_task = SubTask(title=title, body=body) new_sub_task.status = TaskStatus.CURRENT last_main_task = self.global_tasks[-1] last_main_task.add_sub_task(new_sub_task) self._manual_current_task = None # Clear manual focus return new_sub_task def complete_current_task(self) -> Optional[Task]: current = self.current_task if not current: return None current.status = TaskStatus.COMPLETED current.completed_at = datetime.now() self.completed_tasks.append(current) self._manual_current_task = None # Clear manual focus # Find the next incomplete task to focus on # First, look for incomplete siblings to the left if isinstance(current, SubTask): # Find which main task contains this subtask for main_task in self.global_tasks: if current in main_task.sub_tasks: # Look for incomplete subtasks to the left current_index = main_task.sub_tasks.index(current) for i in range(current_index - 1, -1, -1): if main_task.sub_tasks[i].status != TaskStatus.COMPLETED: main_task.sub_tasks[i].status = TaskStatus.CURRENT return current # No incomplete siblings, make parent current if incomplete if main_task.status != TaskStatus.COMPLETED: main_task.status = TaskStatus.CURRENT else: # Parent is also complete, find previous incomplete main task self._find_and_focus_previous_incomplete_task() break else: # Current is a main task, find previous incomplete main task self._find_and_focus_previous_incomplete_task() return current def _find_and_focus_previous_incomplete_task(self) -> None: """Find and focus on the previous incomplete task in the global stack.""" # Look for incomplete main tasks from right to left for i in range(len(self.global_tasks) - 1, -1, -1): main_task = self.global_tasks[i] # Check if this main task has incomplete subtasks if main_task.sub_tasks: for j in range(len(main_task.sub_tasks) - 1, -1, -1): if main_task.sub_tasks[j].status != TaskStatus.COMPLETED: main_task.sub_tasks[j].status = TaskStatus.CURRENT return # Check if the main task itself is incomplete if main_task.status != TaskStatus.COMPLETED: main_task.status = TaskStatus.CURRENT return # No incomplete tasks found - we're in zen state def update_current_task(self, body: str) -> Task: current = self.current_task if not current: raise ValueError("No current task to update") current.body = body return current def get_stack_depth(self) -> int: """Get the total depth of the current task stack.""" if self.is_zen_state: return 0 # Count main tasks depth = len(self.global_tasks) # Add subtask depth from the last main task if self.global_tasks: last_main = self.global_tasks[-1] depth += len(last_main.sub_tasks) return depth def get_siblings_to_left(self) -> List[SubTask]: if self.is_zen_state or not self.global_tasks: return [] last_main_task = self.global_tasks[-1] current = self.current_task if isinstance(current, MainTask) or not last_main_task.sub_tasks: return [] return last_main_task.sub_tasks[:-1] def get_breadcrumb_trail(self) -> List[Task]: """Get the ancestor path to current task (excluding siblings).""" trail: List[Task] = [] # Add all main tasks up to and including the current one for main_task in self.global_tasks: trail.append(main_task) return trail def get_big_picture(self, format: str = "text") -> Any: if format == "json": tasks = [] for main_task in self.global_tasks: task_data: Dict[str, Any] = { "id": main_task.id, "title": main_task.title, "status": main_task.status.value, "is_current": main_task == self.current_task, "created_at": main_task.created_at.isoformat(), "sub_tasks": [], } for sub_task in main_task.sub_tasks: task_data["sub_tasks"].append( { "id": sub_task.id, "title": sub_task.title, "status": sub_task.status.value, "is_current": sub_task == self.current_task, "created_at": sub_task.created_at.isoformat(), } ) tasks.append(task_data) return {"zen_state": self.is_zen_state, "tasks": tasks} else: # text format lines = [] for i, main_task in enumerate(self.global_tasks): status = main_task.status.value marker = " <-- YOU ARE HERE" if main_task == self.current_task else "" lines.append(f"{main_task.title} ({status}){marker}") for sub_task in main_task.sub_tasks: status = sub_task.status.value marker = ( " <-- YOU ARE HERE" if sub_task == self.current_task else "" ) lines.append(f" {sub_task.title} ({status}){marker}") if not lines: return "No tasks (zen state)" elif self.is_zen_state: return "\n".join(lines) + "\n\nAll tasks completed (zen state)" else: return "\n".join(lines) def get_completed_tasks(self, order: str = "chronological") -> List[Task]: if order == "chronological": return self.completed_tasks.copy() elif order == "logical": return list(reversed(self.completed_tasks)) else: raise ValueError("Order must be 'chronological' or 'logical'") def get_focus_path(self) -> str: """Generate a visual representation of the current focus path.""" if self.is_zen_state: return "No active tasks (zen state)" parts = [] current = self.current_task if not current: return "No active tasks" # Find which main task contains the current task containing_main = None if isinstance(current, MainTask): containing_main = current else: # Find the main task that contains this subtask for main_task in self.global_tasks: if current in main_task.sub_tasks: containing_main = main_task break if containing_main: parts.append(containing_main.title) # Add subtasks if current is a subtask if containing_main.sub_tasks and current in containing_main.sub_tasks: for sub_task in containing_main.sub_tasks: parts.append(sub_task.title) if sub_task == current: break # Mark the current task if parts: parts[-1] = f"{parts[-1]} ←YOU ARE HERE" return " → ".join(parts) def get_commitment_count(self) -> int: """Get the number of tasks that must be completed before returning to parent.""" if self.is_zen_state or not self.current_task: return 0 # If current is a main task, no commitments if isinstance(self.current_task, MainTask): return 0 # Count siblings to the right (including current) siblings_to_left = self.get_siblings_to_left() return len(siblings_to_left) + 1 def get_stack_overview(self) -> Dict[str, Any]: return { "zen_state": self.is_zen_state, "stack_depth": self.get_stack_depth(), "current_task_id": self.current_task.id if self.current_task else None, "global_tasks": [ { "id": task.id, "title": task.title, "status": task.status.value, "created_at": task.created_at.isoformat(), "completed_at": ( task.completed_at.isoformat() if task.completed_at else None ), "sub_tasks": [ { "id": sub.id, "title": sub.title, "status": sub.status.value, "created_at": sub.created_at.isoformat(), "completed_at": ( sub.completed_at.isoformat() if sub.completed_at else None ), } for sub in task.sub_tasks ], } for task in self.global_tasks ], "completed_count": len(self.completed_tasks), } def peek_context(self) -> Tuple[Optional[Task], Optional[Task]]: current = self.current_task if not current: return None, None if isinstance(current, SubTask): last_main_task = self.global_tasks[-1] siblings = self.get_siblings_to_left() immediate_context = siblings[-1] if siblings else None return last_main_task, immediate_context else: if len(self.global_tasks) > 1: prev_main_task = self.global_tasks[-2] prev_immediate_context: Task = ( prev_main_task.sub_tasks[-1] if prev_main_task.sub_tasks else prev_main_task ) return prev_main_task, prev_immediate_context return None, None def list_siblings(self) -> List[SubTask]: return self.get_siblings_to_left() def switch_focus(self, task_id: str) -> Dict[str, Any]: """Switch focus to any pending task by ID.""" # Find the task in the structure target_task: Optional[Task] = None for main_task in self.global_tasks: if main_task.id == task_id and main_task.status != TaskStatus.COMPLETED: target_task = main_task break for sub_task in main_task.sub_tasks: if sub_task.id == task_id and sub_task.status != TaskStatus.COMPLETED: target_task = sub_task break if target_task: break if not target_task: raise ValueError(f"Task with ID '{task_id}' not found or already completed") # If it's already the current task, nothing to do if target_task == self.current_task: return { "success": True, "message": "Task is already the current focus", "task_id": task_id, } # Set old current task to PENDING if self.current_task: self.current_task.status = TaskStatus.PENDING # Set new task to CURRENT target_task.status = TaskStatus.CURRENT # Set the manual current task self._manual_current_task = target_task # Return helpful context about the switch return { "success": True, "message": f"Switched focus to task: {target_task.title}", "task_id": task_id, "new_focus_path": self.get_focus_path(), } def remove_task(self, task_id: str) -> Dict[str, Any]: """Remove a task from the structure (but keep in completed_tasks if it was completed).""" # Find the task in the structure removed_task: Optional[Task] = None # Check main tasks for i, main_task in enumerate(self.global_tasks): if main_task.id == task_id: removed_task = main_task self.global_tasks.pop(i) # If this was the current task, clear manual focus if removed_task == self._manual_current_task: self._manual_current_task = None return { "success": True, "message": f"Removed task: {removed_task.title}", "task_id": task_id, "was_completed": removed_task.status == TaskStatus.COMPLETED, } # Check subtasks for j, sub_task in enumerate(main_task.sub_tasks): if sub_task.id == task_id: removed_task = sub_task main_task.sub_tasks.pop(j) # If this was the current task, clear manual focus if removed_task == self._manual_current_task: self._manual_current_task = None return { "success": True, "message": f"Removed subtask: {removed_task.title}", "task_id": task_id, "was_completed": removed_task.status == TaskStatus.COMPLETED, } raise ValueError(f"Task with ID '{task_id}' not found in structure")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cheezcake/aidderall_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server