We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/zhangzhongnan928/mcp-pa-ai-agent'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
from typing import List, Dict, Any, Optional
import os
import json
import logging
import re
from datetime import datetime
import httpx
# Import the MCP server instance from the main file
from mcp_server import mcp, Context
logger = logging.getLogger("mcp-pa-agent.tasks")
# Mock tasks database (in a real implementation, this would be a proper database)
TASKS_FILE = "tasks_data.json"
async def get_tasks():
"""Get all tasks from the storage."""
try:
if os.path.exists(TASKS_FILE):
with open(TASKS_FILE, 'r') as f:
return json.load(f)
return []
except Exception as e:
logger.error(f"Error loading tasks: {str(e)}")
return []
async def save_tasks(tasks):
"""Save tasks to the storage."""
try:
with open(TASKS_FILE, 'w') as f:
json.dump(tasks, f, indent=2)
return True
except Exception as e:
logger.error(f"Error saving tasks: {str(e)}")
return False
# Resources
@mcp.resource("tasks://all")
async def all_tasks_resource() -> str:
"""Provide all tasks as a resource"""
tasks = await get_tasks()
return json.dumps(tasks, indent=2)
@mcp.resource("tasks://status/{status}")
async def tasks_by_status_resource(status: str) -> str:
"""Provide tasks filtered by status"""
all_tasks = await get_tasks()
filtered = [t for t in all_tasks if t.get('status', '').lower() == status.lower()]
return json.dumps(filtered, indent=2)
@mcp.resource("tasks://priority/{priority}")
async def tasks_by_priority_resource(priority: str) -> str:
"""Provide tasks filtered by priority"""
all_tasks = await get_tasks()
filtered = [t for t in all_tasks if t.get('priority', '').lower() == priority.lower()]
return json.dumps(filtered, indent=2)
# Prompts
@mcp.prompt()
def create_task_prompt(description: str, priority: str = "medium") -> str:
"""Create a prompt for task creation"""
return f"Please help me create a new task based on this description: '{description}'. It should have {priority} priority. Please organize the information into title, description, and suggest a reasonable due date if applicable."
# Tool functions
@mcp.tool()
async def list_tasks(status: str = "all", ctx: Context = None) -> str:
"""List tasks with optional filtering by status.
Args:
status: Filter tasks by status ('all', 'pending', 'completed', 'in_progress')
"""
if ctx:
ctx.info(f"Listing tasks with status filter: {status}")
tasks = await get_tasks()
if not tasks:
return "No tasks found."
filtered_tasks = tasks
if status.lower() != "all":
filtered_tasks = [task for task in tasks if task.get('status', '').lower() == status.lower()]
if not filtered_tasks:
return f"No tasks with status '{status}' found."
formatted_tasks = []
for idx, task in enumerate(filtered_tasks, 1):
due_date = task.get('due_date', 'No due date')
created_at = task.get('created_at', 'Unknown')
updated_at = task.get('updated_at', created_at)
formatted_tasks.append(f"""
Task #{task.get('id', idx)}: {task.get('title', 'Untitled')}
Status: {task.get('status', 'Not specified')}
Priority: {task.get('priority', 'Not specified')}
Due Date: {due_date}
Description: {task.get('description', 'No description provided')}
Created: {created_at}
Last Updated: {updated_at}
""")
return "\n---\n".join(formatted_tasks)
@mcp.tool()
async def add_task(title: str, description: str = "", priority: str = "medium", due_date: str = "", ctx: Context = None) -> str:
"""Add a new task.
Args:
title: Title of the task
description: Optional description of the task
priority: Priority level ('low', 'medium', 'high')
due_date: Optional due date in YYYY-MM-DD format
"""
if ctx:
ctx.info(f"Adding new task: {title}")
tasks = await get_tasks()
# Validate title
if not title or len(title.strip()) == 0:
error_msg = "Error: Task title cannot be empty."
if ctx:
ctx.error(error_msg)
return error_msg
# Validate priority
valid_priorities = ['low', 'medium', 'high']
if priority.lower() not in valid_priorities:
error_msg = f"Invalid priority: {priority}. Must be one of: {', '.join(valid_priorities)}."
if ctx:
ctx.error(error_msg)
return error_msg
# Validate due date format if provided
if due_date:
date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}$')
if not date_pattern.match(due_date):
error_msg = f"Invalid due date format: {due_date}. Please use YYYY-MM-DD."
if ctx:
ctx.error(error_msg)
return error_msg
try:
datetime.strptime(due_date, '%Y-%m-%d')
except ValueError:
error_msg = f"Invalid due date: {due_date}. Please use a valid date in YYYY-MM-DD format."
if ctx:
ctx.error(error_msg)
return error_msg
# Generate a unique ID
task_id = 1
if tasks:
existing_ids = [task.get('id', 0) for task in tasks]
task_id = max(existing_ids) + 1
new_task = {
'id': task_id,
'title': title,
'description': description,
'status': 'pending',
'priority': priority.lower(),
'due_date': due_date,
'created_at': datetime.now().isoformat(),
}
tasks.append(new_task)
if await save_tasks(tasks):
success_msg = f"Task '{title}' added successfully with ID {new_task['id']}."
if ctx:
ctx.info(success_msg)
return success_msg
else:
error_msg = "Failed to save the task. Please try again."
if ctx:
ctx.error(error_msg)
return error_msg
@mcp.tool()
async def update_task_status(task_id: int, status: str, ctx: Context = None) -> str:
"""Update the status of a task.
Args:
task_id: The ID of the task to update
status: New status ('pending', 'in_progress', 'completed')
"""
if ctx:
ctx.info(f"Updating task #{task_id} status to: {status}")
# Validate status
valid_statuses = ['pending', 'in_progress', 'completed']
if status.lower() not in valid_statuses:
error_msg = f"Invalid status: {status}. Must be one of: {', '.join(valid_statuses)}."
if ctx:
ctx.error(error_msg)
return error_msg
tasks = await get_tasks()
# Find the task with the given ID
task_found = False
for task in tasks:
if task.get('id') == task_id:
task_found = True
old_status = task.get('status', 'unknown')
task['status'] = status.lower()
task['updated_at'] = datetime.now().isoformat()
if await save_tasks(tasks):
success_msg = f"Task #{task_id} status updated from '{old_status}' to '{status}'."
if ctx:
ctx.info(success_msg)
return success_msg
else:
error_msg = "Failed to update task status. Please try again."
if ctx:
ctx.error(error_msg)
return error_msg
error_msg = f"Task with ID {task_id} not found."
if ctx:
ctx.error(error_msg)
return error_msg
@mcp.tool()
async def delete_task(task_id: int, ctx: Context = None) -> str:
"""Delete a task.
Args:
task_id: The ID of the task to delete
"""
if ctx:
ctx.info(f"Deleting task #{task_id}")
tasks = await get_tasks()
# Find and remove the task with the given ID
original_count = len(tasks)
tasks = [task for task in tasks if task.get('id') != task_id]
if len(tasks) < original_count:
if await save_tasks(tasks):
success_msg = f"Task #{task_id} deleted successfully."
if ctx:
ctx.info(success_msg)
return success_msg
else:
error_msg = "Failed to delete task. Please try again."
if ctx:
ctx.error(error_msg)
return error_msg
else:
error_msg = f"Task with ID {task_id} not found."
if ctx:
ctx.error(error_msg)
return error_msg