Skip to main content
Glama

Task Manager MCP Server

task_manager.py26.8 kB
import os import re import json import yaml from datetime import datetime from pathlib import Path from typing import Dict, Any, Optional, List from mcp.server.fastmcp import FastMCP, Context def create_mcp() -> FastMCP: """Create a new MCP instance with task management tools.""" mcp = FastMCP( "TASK MANAGER", description="Markdown-based task management system with PRD parsing", host=os.getenv("HOST", "0.0.0.0"), port=os.getenv("PORT", "8050") ) class TaskManager: def __init__(self, tasks_dir: str = "tasks"): self.tasks_dir = Path(tasks_dir) self.tasks_dir.mkdir(exist_ok=True) def _get_task_file(self, project_name: str) -> Path: return self.tasks_dir / f"{project_name}.md" def _parse_markdown_tasks(self, content: str) -> List[dict]: tasks = [] current_task = None for line in content.split('\n'): # Match task headers (## Task: Title) task_match = re.match(r'^##\s+Task:\s+(.+)$', line) if task_match: if current_task: tasks.append(current_task) current_task = { 'title': task_match.group(1), 'description': '', 'subtasks': [], 'status': 'todo' } continue # Match subtasks (- [ ] Subtask) subtask_match = re.match(r'^-\s+\[(.)\]\s+(.+)$', line) if subtask_match and current_task: status = 'done' if subtask_match.group(1) == 'x' else 'todo' current_task['subtasks'].append({ 'title': subtask_match.group(2), 'status': status }) continue # Match description lines if current_task and line.strip() and not line.startswith('-'): current_task['description'] += line + '\n' if current_task: tasks.append(current_task) return tasks def _generate_markdown(self, tasks: List[dict]) -> str: """Generate markdown content from tasks list.""" content = "# Project Tasks\n\n" # Add task categories explanation content += "## Categories\n" content += "- [MVP] Core functionality tasks\n" content += "- [AI] AI-related features\n" content += "- [UX] User experience improvements\n" content += "- [INFRA] Infrastructure and setup\n\n" # Add priority levels explanation content += "## Priority Levels\n" content += "- P0: Blocker/Critical\n" content += "- P1: High Priority\n" content += "- P2: Medium Priority\n" content += "- P3: Low Priority\n\n" for idx, task in enumerate(tasks, 1): # Add task header with number, category, and priority category = task.get('category', '') priority = task.get('priority', 'P2') content += f"## Task {idx}: {category} {task['title']} ({priority})\n\n" # Add task description if task.get('description', '').strip(): content += f"{task['description'].strip()}\n\n" # Add dependencies if they exist if task.get('dependencies'): content += "### Dependencies:\n" for dep in task['dependencies']: content += f"- Task {dep}\n" content += "\n" # Add estimated complexity and time if task.get('complexity'): content += f"### Complexity: {task['complexity']}\n" content += f"Estimated hours: {task['estimated_hours']}\n\n" # Add subtasks with proper formatting and indentation if task.get('subtasks'): content += "### Subtasks:\n\n" for subtask in task['subtasks']: status = 'x' if subtask.get('status') == 'done' else ' ' content += f"- [{status}] {subtask['title']}\n" content += "\n" # Add separator between tasks content += "---\n\n" return content # Create a TaskManager instance manager = TaskManager() @mcp.tool() async def create_task_file(ctx: Context, project_name: str) -> str: """Create a new markdown task file for a project. Args: project_name: Name of the project Returns: Confirmation message with file path """ try: task_file = manager._get_task_file(project_name) if task_file.exists(): return f"Task file already exists at {task_file}" with open(task_file, 'w') as f: f.write("# Project Tasks\n\n") return f"Created new task file at {task_file}" except Exception as e: return f"Error creating task file: {str(e)}" @mcp.tool() async def add_task(ctx: Context, project_name: str, title: str, description: str, subtasks: Optional[List[str]] = None, batch_mode: bool = False) -> str: """Add a new task to a project's task file. Args: project_name: Name of the project title: Task title description: Task description subtasks: Optional list of subtasks batch_mode: If True, don't read existing tasks (for bulk additions) Returns: Confirmation message """ try: task_file = manager._get_task_file(project_name) if not task_file.exists() and not batch_mode: return f"Task file not found for project {project_name}" # Read existing content if file exists existing_content = "" if task_file.exists(): with open(task_file, 'r') as f: existing_content = f.read().strip() # If this is the first task, add the header if not existing_content: existing_content = "# Project Tasks\n\n" # Create the new task content new_task = { 'title': title, 'description': description, 'subtasks': [{'title': st, 'status': 'todo'} for st in (subtasks or [])], 'status': 'todo' } # Generate markdown for just this task task_content = f"\n## Task: {new_task['title']}\n\n" if new_task['description'].strip(): task_content += f"{new_task['description'].strip()}\n\n" if new_task['subtasks']: task_content += "### Subtasks:\n\n" for subtask in new_task['subtasks']: status = 'x' if subtask.get('status') == 'done' else ' ' task_content += f"- [{status}] {subtask['title']}\n" task_content += "\n" task_content += "---\n\n" # Append the new task to existing content with open(task_file, 'w') as f: f.write(existing_content.rstrip() + "\n\n" + task_content) return f"Added new task '{title}' to {project_name}" except Exception as e: return f"Error adding task: {str(e)}" @mcp.tool() async def parse_prd(ctx: Context, project_name: str, prd_content: str) -> str: """Parse a PRD and create tasks from it.""" try: # Parse sections from PRD sections = {} current_section = None current_content = [] for line in prd_content.split('\n'): if line.startswith('# '): if current_section: sections[current_section] = '\n'.join(current_content) current_section = line[2:].strip() current_content = [] elif line.startswith('## '): if current_section: sections[current_section] = '\n'.join(current_content) current_section = line[3:].strip() current_content = [] else: current_content.append(line) if current_section: sections[current_section] = '\n'.join(current_content) # Create tasks based on sections tasks = [] # Project Setup (First task) tasks.append({ 'title': 'Project Setup', 'description': 'Set up the Next.js project with TypeScript and Tailwind CSS', 'category': '[INFRA]', 'priority': 'P0', 'complexity': 'low', 'estimated_hours': 4, 'dependencies': [], 'subtasks': [{'title': st, 'status': 'todo'} for st in [ 'Initialize Next.js project', 'Configure TypeScript', 'Set up Tailwind CSS', 'Configure development environment', 'Set up testing framework' ]] }) # Core Features from Key Features section (MVP Phase) if 'Key Features' in sections: features = extract_bullet_points(sections['Key Features']) mvp_features = [f for f in features if 'AI' not in f and 'cloud' not in f.lower()] tasks.append({ 'title': 'Implement Core Features', 'description': 'Implement the core MVP features of the journaling app', 'category': '[MVP]', 'priority': 'P0', 'complexity': 'medium', 'estimated_hours': 8, 'dependencies': [1], # Depends on Project Setup 'subtasks': [{'title': st, 'status': 'todo'} for st in mvp_features] }) # Authentication & Storage (MVP Phase) tasks.append({ 'title': 'Authentication & Local Storage', 'description': 'Implement user authentication and local storage features', 'category': '[MVP]', 'priority': 'P1', 'complexity': 'medium', 'estimated_hours': 8, 'dependencies': [1], # Depends on Project Setup 'subtasks': [{'title': st, 'status': 'todo'} for st in [ 'Implement email authentication', 'Set up local storage with IndexedDB', 'Add user session management', 'Implement data persistence' ]] }) # AI Features (Phase 2) if 'Key Features' in sections: ai_features = [f for f in features if 'AI' in f or 'summarize' in f.lower() or 'pattern' in f.lower()] if ai_features: tasks.append({ 'title': 'Implement AI Features', 'description': 'Add AI-powered features for insights and analysis', 'category': '[AI]', 'priority': 'P2', 'complexity': 'high', 'estimated_hours': 16, 'dependencies': [2, 3], # Depends on Core Features and Auth 'subtasks': [{'title': st, 'status': 'todo'} for st in ai_features] }) # UI/UX Features tasks.append({ 'title': 'Enhance UI/UX', 'description': 'Implement UI/UX improvements and polish', 'category': '[UX]', 'priority': 'P2', 'complexity': 'medium', 'estimated_hours': 8, 'dependencies': [2], # Depends on Core Features 'subtasks': [{'title': st, 'status': 'todo'} for st in [ 'Implement dark/light mode', 'Add responsive design', 'Create minimalist editor', 'Add keyboard shortcuts' ]] }) # Cloud Features (Phase 3) tasks.append({ 'title': 'Implement Cloud Features', 'description': 'Add cloud sync and advanced storage features', 'category': '[INFRA]', 'priority': 'P3', 'complexity': 'high', 'estimated_hours': 16, 'dependencies': [2, 3], # Depends on Core Features and Auth 'subtasks': [{'title': st, 'status': 'todo'} for st in [ 'Set up cloud sync', 'Implement end-to-end encryption', 'Add offline support', 'Create backup/restore functionality' ]] }) # Create task file with the structured content task_file = manager._get_task_file(project_name) with open(task_file, 'w', encoding='utf-8') as f: f.write(manager._generate_markdown(tasks)) return f"Successfully created tasks from PRD in {task_file}" except Exception as e: return f"Error parsing PRD: {str(e)}" @mcp.tool() async def update_task_status(ctx: Context, project_name: str, task_title: str, subtask_title: Optional[str] = None, status: str = "done") -> str: """Update the status of a task or subtask. Args: project_name: Name of the project task_title: Title of the task subtask_title: Optional title of the subtask status: New status (todo/done) Returns: Confirmation message """ try: task_file = manager._get_task_file(project_name) if not task_file.exists(): return f"Task file not found for project {project_name}" with open(task_file, 'r') as f: content = f.read() tasks = manager._parse_markdown_tasks(content) for task in tasks: if task['title'] == task_title: if subtask_title: for subtask in task['subtasks']: if subtask['title'] == subtask_title: subtask['status'] = status break else: task['status'] = status break with open(task_file, 'w') as f: f.write(manager._generate_markdown(tasks)) return f"Updated status of {'subtask' if subtask_title else 'task'} to {status}" except Exception as e: return f"Error updating status: {str(e)}" @mcp.tool() async def get_next_task(ctx: Context, project_name: str) -> str: """Get the next uncompleted task from a project. Args: project_name: Name of the project Returns: Next task information or completion message """ try: task_file = manager._get_task_file(project_name) if not task_file.exists(): return f"Task file not found for project {project_name}" with open(task_file, 'r') as f: content = f.read() tasks = manager._parse_markdown_tasks(content) for task in tasks: if task['status'] == 'todo': # Check if all subtasks are done if all(st['status'] == 'done' for st in task['subtasks']): continue # Find first incomplete subtask for subtask in task['subtasks']: if subtask['status'] == 'todo': return json.dumps({ 'task': task['title'], 'subtask': subtask['title'], 'description': task['description'] }) return "All tasks are completed!" except Exception as e: return f"Error getting next task: {str(e)}" @mcp.tool() async def expand_task(ctx: Context, project_name: str, task_title: str) -> str: """Break down a task into smaller, more manageable subtasks using AI. Args: project_name: Name of the project task_title: Title of the task to expand Returns: Confirmation message with new subtasks """ try: task_file = manager._get_task_file(project_name) if not task_file.exists(): return f"Task file not found for project {project_name}" with open(task_file, 'r') as f: content = f.read() tasks = manager._parse_markdown_tasks(content) for task in tasks: if task['title'] == task_title: # Use AI to generate subtasks prompt = f"Break down this task into smaller, actionable subtasks: {task['description']}" # Here we would use Cursor's AI to generate subtasks # For now, we'll use a placeholder new_subtasks = [ "Research existing solutions", "Design implementation approach", "Write initial code", "Test functionality", "Review and refine" ] task['subtasks'].extend([{'title': st, 'status': 'todo'} for st in new_subtasks]) with open(task_file, 'w') as f: f.write(manager._generate_markdown(tasks)) return f"Expanded task '{task_title}' with new subtasks" return f"Task '{task_title}' not found" except Exception as e: return f"Error expanding task: {str(e)}" @mcp.tool() async def generate_task_file(ctx: Context, project_name: str, task_title: str) -> str: """Generate a file template based on a task's description. Args: project_name: Name of the project task_title: Title of the task to generate file for Returns: Confirmation message with file path """ try: task_file = manager._get_task_file(project_name) if not task_file.exists(): return f"Task file not found for project {project_name}" with open(task_file, 'r') as f: content = f.read() tasks = manager._parse_markdown_tasks(content) for task in tasks: if task['title'] == task_title: # Use AI to generate file content prompt = f"Generate a file template for implementing: {task['description']}" # Here we would use Cursor's AI to generate file content # For now, we'll use a placeholder file_content = """# File generated from task: {task_title} def main(): # TODO: Implement functionality pass if __name__ == "__main__": main() """ # Create file in project directory file_path = Path(project_name) / f"{task_title.lower().replace(' ', '_')}.py" file_path.parent.mkdir(exist_ok=True) with open(file_path, 'w') as f: f.write(file_content) return f"Generated file template at {file_path}" return f"Task '{task_title}' not found" except Exception as e: return f"Error generating file: {str(e)}" @mcp.tool() async def get_task_dependencies(ctx: Context, project_name: str, task_title: str) -> str: """Get all tasks that depend on the given task. Args: project_name: Name of the project task_title: Title of the task to check dependencies for Returns: JSON string of dependent tasks """ try: task_file = manager._get_task_file(project_name) if not task_file.exists(): return f"Task file not found for project {project_name}" with open(task_file, 'r') as f: content = f.read() tasks = manager._parse_markdown_tasks(content) dependent_tasks = [] for task in tasks: if task['title'] != task_title and task_title.lower() in task['description'].lower(): dependent_tasks.append({ 'title': task['title'], 'status': task['status'] }) return json.dumps(dependent_tasks) except Exception as e: return f"Error getting dependencies: {str(e)}" @mcp.tool() async def estimate_task_complexity(ctx: Context, project_name: str, task_title: str) -> str: """Estimate the complexity of a task using AI. Args: project_name: Name of the project task_title: Title of the task to estimate Returns: Complexity estimate (low/medium/high) """ try: task_file = manager._get_task_file(project_name) if not task_file.exists(): return f"Task file not found for project {project_name}" with open(task_file, 'r') as f: content = f.read() tasks = manager._parse_markdown_tasks(content) for task in tasks: if task['title'] == task_title: # Use AI to estimate complexity prompt = f"Estimate the complexity of this task (low/medium/high): {task['description']}" # Here we would use Cursor's AI to estimate complexity # For now, we'll use a placeholder complexity = "medium" return json.dumps({ 'task': task_title, 'complexity': complexity, 'estimated_hours': 4 if complexity == 'low' else 8 if complexity == 'medium' else 16 }) return f"Task '{task_title}' not found" except Exception as e: return f"Error estimating complexity: {str(e)}" @mcp.tool() async def suggest_next_actions(ctx: Context, project_name: str, task_title: str) -> str: """Suggest next actions for a task using AI. Args: project_name: Name of the project task_title: Title of the task to get suggestions for Returns: JSON string of suggested actions """ try: task_file = manager._get_task_file(project_name) if not task_file.exists(): return f"Task file not found for project {project_name}" with open(task_file, 'r') as f: content = f.read() tasks = manager._parse_markdown_tasks(content) for task in tasks: if task['title'] == task_title: # Use AI to suggest next actions prompt = f"Suggest next actions for this task: {task['description']}" # Here we would use Cursor's AI to suggest actions # For now, we'll use a placeholder suggestions = [ "Review existing codebase", "Set up development environment", "Create initial test cases", "Implement core functionality", "Write documentation" ] return json.dumps({ 'task': task_title, 'suggestions': suggestions }) return f"Task '{task_title}' not found" except Exception as e: return f"Error suggesting actions: {str(e)}" return mcp def extract_bullet_points(content: str) -> List[str]: """Extract bullet points from text content.""" points = [] for line in content.split('\n'): line = line.strip() if line and (line.startswith('-') or line.startswith('*') or line.startswith('•')): # Remove bullet point marker and any markdown formatting cleaned = re.sub(r'^[-*•]\s*', '', line) cleaned = re.sub(r'`[^`]*`', '', cleaned) cleaned = re.sub(r'\*\*([^*]*)\*\*', r'\1', cleaned) cleaned = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', cleaned) if cleaned: points.append(cleaned) return points # Create the default MCP instance for backward compatibility mcp = create_mcp()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tradesdontlie/task-manager-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server