tasks.py•9.33 kB
from typing import List, Dict, Any
from mcp.types import Tool, TextContent
import json
from ..schemas import DynamicSchemaBuilder
class TaskTools:
"""Task management tools for ClickUp MCP"""
def __init__(self, client):
self.client = client
self.schema_builder = DynamicSchemaBuilder()
self._custom_fields_cache = {}
def get_tools(self) -> List[Tool]:
"""Get all task-related tools with dynamic schemas"""
return [
Tool(
name="create_task",
description="Create a new task in ClickUp",
inputSchema=self.schema_builder.get_create_task_schema()
),
Tool(
name="get_task",
description="Get details of a specific task",
inputSchema={
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "The ID of the task to retrieve"
},
"include_subtasks": {
"type": "boolean",
"description": "Include subtasks in the response (optional)"
},
"include_closed": {
"type": "boolean",
"description": "Include closed subtasks (optional)"
}
},
"required": ["task_id"]
}
),
Tool(
name="update_task",
description="Update an existing task",
inputSchema=self.schema_builder.get_update_task_schema()
),
Tool(
name="delete_task",
description="Delete a task",
inputSchema={
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "The ID of the task to delete"
}
},
"required": ["task_id"]
}
),
Tool(
name="list_tasks",
description="Get tasks from a list with optional filtering",
inputSchema=self.schema_builder.get_list_tasks_schema()
),
Tool(
name="search_tasks",
description="Search tasks across a workspace",
inputSchema=self.schema_builder.get_search_tasks_schema()
)
]
async def handle_tool_call(self, name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle task tool calls"""
try:
if name == "create_task":
result = await self._create_task(arguments)
elif name == "get_task":
result = await self._get_task(arguments)
elif name == "update_task":
result = await self._update_task(arguments)
elif name == "delete_task":
result = await self._delete_task(arguments)
elif name == "list_tasks":
result = await self._list_tasks(arguments)
elif name == "search_tasks":
result = await self._search_tasks(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
return [TextContent(type="text", text=json.dumps(result, indent=2))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _create_task(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new task with dynamic field handling"""
list_id = args["list_id"]
task_data = {"name": args["name"]}
# Handle standard fields
standard_fields = ["assignees", "status", "priority", "due_date", "tags", "parent"]
for field in standard_fields:
if field in args:
task_data[field] = args[field]
# Handle description fields - only use one
if "markdown_content" in args:
task_data["markdown_content"] = args["markdown_content"]
elif "description" in args:
task_data["description"] = args["description"]
# Handle custom fields
custom_fields = []
for key, value in args.items():
if key.startswith("custom_field_"):
field_id = key.replace("custom_field_", "")
custom_fields.append({
"id": field_id,
"value": value
})
if custom_fields:
task_data["custom_fields"] = custom_fields
return self.client.create_task(list_id, task_data)
async def _get_task(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Get task details with optional parameters"""
task_id = args["task_id"]
# Build query parameters for enhanced task retrieval
params = {}
if "include_subtasks" in args:
params["include_subtasks"] = args["include_subtasks"]
if "include_closed" in args:
params["include_closed"] = args["include_closed"]
task = self.client.get_task(task_id)
# If this is the first time we see this list, cache custom fields
if "list" in task and "id" in task["list"]:
list_id = task["list"]["id"]
await self._cache_custom_fields_for_list(list_id)
return task
async def _update_task(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Update a task with dynamic field handling"""
task_id = args["task_id"]
task_data = {}
# Handle standard fields
standard_fields = ["name", "status", "priority", "due_date", "assignees"]
for field in standard_fields:
if field in args:
task_data[field] = args[field]
# Handle description fields - only use one
if "markdown_content" in args:
task_data["markdown_content"] = args["markdown_content"]
elif "description" in args:
task_data["description"] = args["description"]
# Handle custom fields
custom_fields = []
for key, value in args.items():
if key.startswith("custom_field_"):
field_id = key.replace("custom_field_", "")
custom_fields.append({
"id": field_id,
"value": value
})
if custom_fields:
task_data["custom_fields"] = custom_fields
return self.client.update_task(task_id, task_data)
async def _delete_task(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Delete a task"""
return self.client.delete_task(args["task_id"])
async def _list_tasks(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""List tasks with filtering"""
list_id = args["list_id"]
params = {}
optional_params = [
"archived", "page", "order_by", "reverse", "subtasks", "statuses",
"include_closed", "assignees", "tags", "due_date_gt", "due_date_lt",
"date_created_gt", "date_created_lt", "date_updated_gt", "date_updated_lt"
]
for param in optional_params:
if param in args:
params[param] = args[param]
tasks = self.client.get_tasks(list_id, **params)
return {"tasks": tasks, "count": len(tasks)}
async def _search_tasks(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Search tasks in workspace"""
workspace_id = args["workspace_id"]
query = args["query"]
params = {}
optional_params = [
"page", "order_by", "reverse", "subtasks", "space_ids", "list_ids",
"statuses", "assignees", "tags"
]
for param in optional_params:
if param in args:
params[param] = args[param]
tasks = self.client.search_tasks(workspace_id, query, **params)
return {"tasks": tasks, "count": len(tasks), "query": query}
async def _cache_custom_fields_for_list(self, list_id: str):
"""Cache custom fields for a list to enable dynamic schema generation"""
if list_id in self._custom_fields_cache:
return
try:
# Get list details which should include custom fields
list_details = self.client._make_request("GET", f"list/{list_id}")
custom_fields = list_details.get("custom_fields", [])
self._custom_fields_cache[list_id] = custom_fields
except Exception:
# If we can't get custom fields, cache empty list
self._custom_fields_cache[list_id] = []
def get_enhanced_schema_for_list(self, list_id: str, base_schema_method) -> Dict[str, Any]:
"""Get enhanced schema with custom fields for a specific list"""
base_schema = base_schema_method()
if list_id in self._custom_fields_cache:
custom_fields = self._custom_fields_cache[list_id]
return self.schema_builder.add_custom_fields(base_schema, custom_fields)
return base_schema