MCP Personal Assistant Agent
- modules
from typing import List, Dict, Any, Optional
import os
import json
import logging
import re
from datetime import datetime
import httpx
# Import the MCP server instance from the main file
from mcp_server import mcp, Context
logger = logging.getLogger("mcp-pa-agent.tasks")
# Mock tasks database (in a real implementation, this would be a proper database)
TASKS_FILE = "tasks_data.json"
async def get_tasks():
"""Get all tasks from the storage."""
try:
if os.path.exists(TASKS_FILE):
with open(TASKS_FILE, 'r') as f:
return json.load(f)
return []
except Exception as e:
logger.error(f"Error loading tasks: {str(e)}")
return []
async def save_tasks(tasks):
"""Save tasks to the storage."""
try:
with open(TASKS_FILE, 'w') as f:
json.dump(tasks, f, indent=2)
return True
except Exception as e:
logger.error(f"Error saving tasks: {str(e)}")
return False
# Resources
@mcp.resource("tasks://all")
async def all_tasks_resource() -> str:
"""Provide all tasks as a resource"""
tasks = await get_tasks()
return json.dumps(tasks, indent=2)
@mcp.resource("tasks://status/{status}")
async def tasks_by_status_resource(status: str) -> str:
"""Provide tasks filtered by status"""
all_tasks = await get_tasks()
filtered = [t for t in all_tasks if t.get('status', '').lower() == status.lower()]
return json.dumps(filtered, indent=2)
@mcp.resource("tasks://priority/{priority}")
async def tasks_by_priority_resource(priority: str) -> str:
"""Provide tasks filtered by priority"""
all_tasks = await get_tasks()
filtered = [t for t in all_tasks if t.get('priority', '').lower() == priority.lower()]
return json.dumps(filtered, indent=2)
# Prompts
@mcp.prompt()
def create_task_prompt(description: str, priority: str = "medium") -> str:
"""Create a prompt for task creation"""
return f"Please help me create a new task based on this description: '{description}'. It should have {priority} priority. Please organize the information into title, description, and suggest a reasonable due date if applicable."
# Tool functions
@mcp.tool()
async def list_tasks(status: str = "all", ctx: Context = None) -> str:
"""List tasks with optional filtering by status.
Args:
status: Filter tasks by status ('all', 'pending', 'completed', 'in_progress')
"""
if ctx:
ctx.info(f"Listing tasks with status filter: {status}")
tasks = await get_tasks()
if not tasks:
return "No tasks found."
filtered_tasks = tasks
if status.lower() != "all":
filtered_tasks = [task for task in tasks if task.get('status', '').lower() == status.lower()]
if not filtered_tasks:
return f"No tasks with status '{status}' found."
formatted_tasks = []
for idx, task in enumerate(filtered_tasks, 1):
due_date = task.get('due_date', 'No due date')
created_at = task.get('created_at', 'Unknown')
updated_at = task.get('updated_at', created_at)
formatted_tasks.append(f"""
Task #{task.get('id', idx)}: {task.get('title', 'Untitled')}
Status: {task.get('status', 'Not specified')}
Priority: {task.get('priority', 'Not specified')}
Due Date: {due_date}
Description: {task.get('description', 'No description provided')}
Created: {created_at}
Last Updated: {updated_at}
""")
return "\n---\n".join(formatted_tasks)
@mcp.tool()
async def add_task(title: str, description: str = "", priority: str = "medium", due_date: str = "", ctx: Context = None) -> str:
"""Add a new task.
Args:
title: Title of the task
description: Optional description of the task
priority: Priority level ('low', 'medium', 'high')
due_date: Optional due date in YYYY-MM-DD format
"""
if ctx:
ctx.info(f"Adding new task: {title}")
tasks = await get_tasks()
# Validate title
if not title or len(title.strip()) == 0:
error_msg = "Error: Task title cannot be empty."
if ctx:
ctx.error(error_msg)
return error_msg
# Validate priority
valid_priorities = ['low', 'medium', 'high']
if priority.lower() not in valid_priorities:
error_msg = f"Invalid priority: {priority}. Must be one of: {', '.join(valid_priorities)}."
if ctx:
ctx.error(error_msg)
return error_msg
# Validate due date format if provided
if due_date:
date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}$')
if not date_pattern.match(due_date):
error_msg = f"Invalid due date format: {due_date}. Please use YYYY-MM-DD."
if ctx:
ctx.error(error_msg)
return error_msg
try:
datetime.strptime(due_date, '%Y-%m-%d')
except ValueError:
error_msg = f"Invalid due date: {due_date}. Please use a valid date in YYYY-MM-DD format."
if ctx:
ctx.error(error_msg)
return error_msg
# Generate a unique ID
task_id = 1
if tasks:
existing_ids = [task.get('id', 0) for task in tasks]
task_id = max(existing_ids) + 1
new_task = {
'id': task_id,
'title': title,
'description': description,
'status': 'pending',
'priority': priority.lower(),
'due_date': due_date,
'created_at': datetime.now().isoformat(),
}
tasks.append(new_task)
if await save_tasks(tasks):
success_msg = f"Task '{title}' added successfully with ID {new_task['id']}."
if ctx:
ctx.info(success_msg)
return success_msg
else:
error_msg = "Failed to save the task. Please try again."
if ctx:
ctx.error(error_msg)
return error_msg
@mcp.tool()
async def update_task_status(task_id: int, status: str, ctx: Context = None) -> str:
"""Update the status of a task.
Args:
task_id: The ID of the task to update
status: New status ('pending', 'in_progress', 'completed')
"""
if ctx:
ctx.info(f"Updating task #{task_id} status to: {status}")
# Validate status
valid_statuses = ['pending', 'in_progress', 'completed']
if status.lower() not in valid_statuses:
error_msg = f"Invalid status: {status}. Must be one of: {', '.join(valid_statuses)}."
if ctx:
ctx.error(error_msg)
return error_msg
tasks = await get_tasks()
# Find the task with the given ID
task_found = False
for task in tasks:
if task.get('id') == task_id:
task_found = True
old_status = task.get('status', 'unknown')
task['status'] = status.lower()
task['updated_at'] = datetime.now().isoformat()
if await save_tasks(tasks):
success_msg = f"Task #{task_id} status updated from '{old_status}' to '{status}'."
if ctx:
ctx.info(success_msg)
return success_msg
else:
error_msg = "Failed to update task status. Please try again."
if ctx:
ctx.error(error_msg)
return error_msg
error_msg = f"Task with ID {task_id} not found."
if ctx:
ctx.error(error_msg)
return error_msg
@mcp.tool()
async def delete_task(task_id: int, ctx: Context = None) -> str:
"""Delete a task.
Args:
task_id: The ID of the task to delete
"""
if ctx:
ctx.info(f"Deleting task #{task_id}")
tasks = await get_tasks()
# Find and remove the task with the given ID
original_count = len(tasks)
tasks = [task for task in tasks if task.get('id') != task_id]
if len(tasks) < original_count:
if await save_tasks(tasks):
success_msg = f"Task #{task_id} deleted successfully."
if ctx:
ctx.info(success_msg)
return success_msg
else:
error_msg = "Failed to delete task. Please try again."
if ctx:
ctx.error(error_msg)
return error_msg
else:
error_msg = f"Task with ID {task_id} not found."
if ctx:
ctx.error(error_msg)
return error_msg