MCP Tasks Organizer
by huntsyea
- tasks_organizer
#!/usr/bin/env python3
import re
import json
import asyncio
import os
from datetime import datetime
from typing import Optional, List, Dict, Any, Tuple
from mcp.server.fastmcp import FastMCP
# Initialize FastMCP server
mcp = FastMCP("tasks-organizer")
# Constants
TASKS_FOLDER = ".tasks"
COMPLETED_PREFIX = "✅"
@mcp.tool()
async def create_task_list(
title: str,
description: str,
repo_path: str = ".",
include_metadata: bool = True
) -> str:
"""Create a new task list and save it to the .tasks folder.
Args:
title: Title for the task list
description: Short 2-3 word description for the filename (e.g., "refactor-authentication")
repo_path: Path to the repository root (defaults to current directory)
include_metadata: Whether to include creation date/time
Returns:
Path to the created task list file
"""
# Sanitize the description for filename use
safe_description = description.lower().replace(" ", "-")
safe_description = re.sub(r'[^a-z0-9\-]', '', safe_description)
# Create .tasks directory if it doesn't exist
tasks_dir = os.path.join(repo_path, TASKS_FOLDER)
if not os.path.exists(tasks_dir):
os.makedirs(tasks_dir)
# Generate markdown content
markdown = f"# {title}\n\n"
if include_metadata:
markdown += f"*Created on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*\n\n"
markdown += "## Tasks\n\n"
markdown += "*No tasks yet*\n"
# Save to file
filename = f"{safe_description}.md"
file_path = os.path.join(tasks_dir, filename)
with open(file_path, 'w') as file:
file.write(markdown)
return f"Created task list at {file_path}"
@mcp.tool()
async def convert_plan_to_tasks(
plan_text: str,
title: str,
description: str,
repo_path: str = ".",
include_metadata: bool = True
) -> str:
"""Convert a Cursor agent's plan text into a formatted Markdown task list and save it.
Args:
plan_text: The plan text from the Cursor agent
title: Title for the task list
description: Short 2-3 word description for the filename (e.g., "refactor-authentication")
repo_path: Path to the repository root (defaults to current directory)
include_metadata: Whether to include metadata like date and time
Returns:
Path to the created task list file
"""
# Sanitize the description for filename use
safe_description = description.lower().replace(" ", "-")
safe_description = re.sub(r'[^a-z0-9\-]', '', safe_description)
# Create .tasks directory if it doesn't exist
tasks_dir = os.path.join(repo_path, TASKS_FOLDER)
if not os.path.exists(tasks_dir):
os.makedirs(tasks_dir)
# Basic structure for our markdown
markdown = f"# {title}\n\n"
# Add metadata if requested
if include_metadata:
markdown += f"*Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*\n\n"
# Extract tasks from the plan text
tasks = extract_tasks(plan_text)
# Format tasks as markdown
if tasks:
markdown += "## Tasks\n\n"
for i, task in enumerate(tasks):
markdown += f"{i+1}. [ ] {task.strip()}\n"
else:
# If no specific tasks were found, format the entire plan
sections = format_plan_sections(plan_text)
markdown += sections
# Save to file
filename = f"{safe_description}.md"
file_path = os.path.join(tasks_dir, filename)
with open(file_path, 'w') as file:
file.write(markdown)
return f"Created task list at {file_path}"
@mcp.tool()
async def add_task(
description: str,
task_text: str,
repo_path: str = ".",
section: str = "Tasks"
) -> str:
"""Add a new task to an existing task list.
Args:
description: The description identifier of the task list file
task_text: Text for the new task
repo_path: Path to the repository root (defaults to current directory)
section: Which section to add the task to (defaults to "Tasks")
Returns:
Updated markdown task list
"""
# Find the task file
task_file, content = find_task_file(description, repo_path)
if not task_file:
return f"Error: Could not find task list with description '{description}'"
lines = content.split('\n')
section_header = f"## {section}"
# Find the section
section_index = -1
for i, line in enumerate(lines):
if line.strip() == section_header:
section_index = i
break
if section_index == -1:
# Section doesn't exist, add it
lines.append("")
lines.append(section_header)
lines.append("")
section_index = len(lines) - 1
# Find where to insert the task
task_start = section_index + 1
task_count = 0
# Count existing tasks
i = task_start
while i < len(lines) and not lines[i].startswith('#'):
if re.match(r'^\d+\.\s+\[[ x]\]', lines[i]):
task_count += 1
i += 1
# Remove "No tasks yet" if it exists
if task_count == 0 and task_start < len(lines) and "*No tasks yet*" in lines[task_start]:
lines[task_start] = f"{task_count + 1}. [ ] {task_text}"
else:
# Insert the new task
insert_index = task_start
while insert_index < len(lines) and not lines[insert_index].startswith('#'):
insert_index += 1
lines.insert(insert_index, f"{task_count + 1}. [ ] {task_text}")
updated_content = '\n'.join(lines)
# Save updated content
with open(task_file, 'w') as file:
file.write(updated_content)
return f"Added task '{task_text}' to {os.path.basename(task_file)}"
@mcp.tool()
async def mark_task_complete(
description: str,
task_number: int,
repo_path: str = ".",
section: str = "Tasks"
) -> str:
"""Mark a specific task as completed.
Args:
description: The description identifier of the task list file
task_number: The number of the task to mark as complete
repo_path: Path to the repository root (defaults to current directory)
section: Which section the task is in (defaults to "Tasks")
Returns:
Updated markdown task list
"""
# Find the task file
task_file, content = find_task_file(description, repo_path)
if not task_file:
return f"Error: Could not find task list with description '{description}'"
lines = content.split('\n')
section_header = f"## {section}"
# Find the section
section_index = -1
for i, line in enumerate(lines):
if line.strip() == section_header:
section_index = i
break
if section_index == -1:
return f"Error: Section '{section}' not found in task list"
# Find the task
task_index = -1
current_task = 0
for i in range(section_index + 1, len(lines)):
if lines[i].startswith('#'):
break
task_match = re.match(r'^(\d+)\.\s+\[[ x]\]', lines[i])
if task_match:
current_task += 1
if current_task == task_number:
task_index = i
break
if task_index == -1:
return f"Error: Task {task_number} not found in section '{section}'"
# Update the task status
lines[task_index] = re.sub(r'^\d+\.\s+\[ \]', f"{task_number}. [x]", lines[task_index])
updated_content = '\n'.join(lines)
# Save updated content
with open(task_file, 'w') as file:
file.write(updated_content)
return f"Marked task {task_number} as complete in {os.path.basename(task_file)}"
@mcp.tool()
async def check_all_tasks_complete(
description: str,
repo_path: str = ".",
) -> str:
"""Check if all tasks are complete and mark the task list as completed.
Args:
description: The description identifier of the task list file
repo_path: Path to the repository root (defaults to current directory)
Returns:
Message indicating if the task list was marked as completed
"""
# Find the task file
task_file, content = find_task_file(description, repo_path)
if not task_file:
return f"Error: Could not find task list with description '{description}'"
# Check if all tasks are complete
all_tasks_complete = True
incomplete_task_count = 0
# Find all task lines
task_matches = re.finditer(r'^\d+\.\s+\[([ x])\]', content, re.MULTILINE)
for match in task_matches:
if match.group(1) == ' ': # Unchecked box
all_tasks_complete = False
incomplete_task_count += 1
if not all_tasks_complete:
return f"Task list has {incomplete_task_count} incomplete tasks. Cannot mark as completed."
# If all tasks are complete, rename the file with the ✅ prefix
filename = os.path.basename(task_file)
if not filename.startswith(COMPLETED_PREFIX):
task_dir = os.path.dirname(task_file)
new_filename = f"{COMPLETED_PREFIX}{filename}"
new_file_path = os.path.join(task_dir, new_filename)
os.rename(task_file, new_file_path)
return f"All tasks complete! Renamed task list to {new_filename}"
else:
return "All tasks are already complete and the list is marked as completed."
@mcp.tool()
async def list_task_files(
repo_path: str = ".",
include_completed: bool = True
) -> str:
"""List all task files in the .tasks directory.
Args:
repo_path: Path to the repository root (defaults to current directory)
include_completed: Whether to include completed task lists
Returns:
List of task files with their completion status
"""
tasks_dir = os.path.join(repo_path, TASKS_FOLDER)
if not os.path.exists(tasks_dir):
return "No .tasks directory exists yet."
# Get list of markdown files
task_files = []
for file in os.listdir(tasks_dir):
if file.endswith('.md'):
is_completed = file.startswith(COMPLETED_PREFIX)
if include_completed or not is_completed:
task_files.append((file, is_completed))
if not task_files:
return "No task lists found."
# Format the output
result = "## Task Lists\n\n"
for filename, is_completed in sorted(task_files):
status = "✅ Complete" if is_completed else "⏳ In Progress"
description = filename.replace(COMPLETED_PREFIX, "").replace(".md", "")
result += f"- **{description}**: {status}\n"
return result
def find_task_file(description: str, repo_path: str) -> Tuple[Optional[str], Optional[str]]:
"""Find a task file by its description.
Args:
description: The description identifier of the task list file
repo_path: Path to the repository root
Returns:
Tuple of (file_path, content) or (None, None) if not found
"""
tasks_dir = os.path.join(repo_path, TASKS_FOLDER)
if not os.path.exists(tasks_dir):
return None, None
# Normalize description for comparison
safe_description = description.lower().replace(" ", "-")
safe_description = re.sub(r'[^a-z0-9\-]', '', safe_description)
# Look for files that match the description
for file in os.listdir(tasks_dir):
if file.endswith('.md'):
# Remove completion prefix and extension for comparison
file_desc = file.replace(COMPLETED_PREFIX, "").replace(".md", "")
if file_desc == safe_description:
file_path = os.path.join(tasks_dir, file)
with open(file_path, 'r') as f:
content = f.read()
return file_path, content
return None, None
def extract_tasks(text: str) -> List[str]:
"""Extract tasks from the plan text.
Looks for common patterns that indicate tasks in the text.
Args:
text: The plan text to parse
Returns:
A list of extracted tasks
"""
tasks = []
# Try to find numbered steps (1. Step one, 2. Step two)
numbered_steps = re.findall(r'\b(\d+\.?\s*[A-Z].*?)(?=\b\d+\.|\n\n|$)', text, re.DOTALL)
if numbered_steps:
# Clean up the steps
tasks = [re.sub(r'^\d+\.?\s*', '', step).strip() for step in numbered_steps]
return tasks
# Try to find bullet points
bullet_points = re.findall(r'(?:^|\n)[*\-•]\s*(.*?)(?=\n[*\-•]|\n\n|$)', text, re.DOTALL)
if bullet_points:
tasks = [point.strip() for point in bullet_points]
return tasks
# Try to find sentences that contain task-like keywords
task_sentences = re.findall(r'(?:need to|should|must|will|going to|let\'s|we can|I\'ll|can|todo|to-do|task)\s+([^\.]+\.)', text, re.IGNORECASE)
if task_sentences:
tasks = [sentence.strip() for sentence in task_sentences]
return tasks
# If all else fails, split by paragraphs and treat each as a potential task
paragraphs = re.split(r'\n\s*\n', text)
if paragraphs:
# Filter out very short paragraphs and headers
tasks = [p.strip() for p in paragraphs if len(p.strip()) > 10 and not p.strip().startswith('#')]
return tasks
return tasks
def format_plan_sections(text: str) -> str:
"""Format the plan into structured sections if no clear tasks were found.
Args:
text: The plan text to format
Returns:
Formatted markdown with sections
"""
# Look for markdown headers already in the text
has_headers = bool(re.search(r'^#+\s+', text, re.MULTILINE))
if has_headers:
# If the text already has headers, preserve them
return text
# Otherwise try to identify sections
sections = {
"## Overview": [],
"## Implementation Details": [],
"## Next Steps": []
}
lines = text.split('\n')
current_section = "## Overview"
for line in lines:
# Simple heuristic: Try to identify section breaks
if re.match(r'^.*(overview|summary|about).*$', line, re.IGNORECASE):
current_section = "## Overview"
continue
elif re.match(r'^.*(implementation|details|how to|approach).*$', line, re.IGNORECASE):
current_section = "## Implementation Details"
continue
elif re.match(r'^.*(next steps|future|todo|to do).*$', line, re.IGNORECASE):
current_section = "## Next Steps"
continue
# Add the line to the current section
if line.strip():
sections[current_section].append(line)
# Build the final markdown
result = ""
for section, content in sections.items():
if content:
result += f"{section}\n\n"
# Format as a list if it's the "Next Steps" section
if section == "## Next Steps":
for i, line in enumerate(content):
result += f"{i+1}. [ ] {line.strip()}\n"
else:
result += "\n".join(content) + "\n\n"
return result
if __name__ == "__main__":
# Initialize and run the server
mcp.run(transport='stdio')