Skip to main content
Glama
tasks.py9.33 kB
from typing import List, Dict, Any from mcp.types import Tool, TextContent import json from ..schemas import DynamicSchemaBuilder class TaskTools: """Task management tools for ClickUp MCP""" def __init__(self, client): self.client = client self.schema_builder = DynamicSchemaBuilder() self._custom_fields_cache = {} def get_tools(self) -> List[Tool]: """Get all task-related tools with dynamic schemas""" return [ Tool( name="create_task", description="Create a new task in ClickUp", inputSchema=self.schema_builder.get_create_task_schema() ), Tool( name="get_task", description="Get details of a specific task", inputSchema={ "type": "object", "properties": { "task_id": { "type": "string", "description": "The ID of the task to retrieve" }, "include_subtasks": { "type": "boolean", "description": "Include subtasks in the response (optional)" }, "include_closed": { "type": "boolean", "description": "Include closed subtasks (optional)" } }, "required": ["task_id"] } ), Tool( name="update_task", description="Update an existing task", inputSchema=self.schema_builder.get_update_task_schema() ), Tool( name="delete_task", description="Delete a task", inputSchema={ "type": "object", "properties": { "task_id": { "type": "string", "description": "The ID of the task to delete" } }, "required": ["task_id"] } ), Tool( name="list_tasks", description="Get tasks from a list with optional filtering", inputSchema=self.schema_builder.get_list_tasks_schema() ), Tool( name="search_tasks", description="Search tasks across a workspace", inputSchema=self.schema_builder.get_search_tasks_schema() ) ] async def handle_tool_call(self, name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Handle task tool calls""" try: if name == "create_task": result = await self._create_task(arguments) elif name == "get_task": result = await self._get_task(arguments) elif name == "update_task": result = await self._update_task(arguments) elif name == "delete_task": result = await self._delete_task(arguments) elif name == "list_tasks": result = await self._list_tasks(arguments) elif name == "search_tasks": result = await self._search_tasks(arguments) else: raise ValueError(f"Unknown tool: {name}") return [TextContent(type="text", text=json.dumps(result, indent=2))] except Exception as e: return [TextContent(type="text", text=f"Error: {str(e)}")] async def _create_task(self, args: Dict[str, Any]) -> Dict[str, Any]: """Create a new task with dynamic field handling""" list_id = args["list_id"] task_data = {"name": args["name"]} # Handle standard fields standard_fields = ["assignees", "status", "priority", "due_date", "tags", "parent"] for field in standard_fields: if field in args: task_data[field] = args[field] # Handle description fields - only use one if "markdown_content" in args: task_data["markdown_content"] = args["markdown_content"] elif "description" in args: task_data["description"] = args["description"] # Handle custom fields custom_fields = [] for key, value in args.items(): if key.startswith("custom_field_"): field_id = key.replace("custom_field_", "") custom_fields.append({ "id": field_id, "value": value }) if custom_fields: task_data["custom_fields"] = custom_fields return self.client.create_task(list_id, task_data) async def _get_task(self, args: Dict[str, Any]) -> Dict[str, Any]: """Get task details with optional parameters""" task_id = args["task_id"] # Build query parameters for enhanced task retrieval params = {} if "include_subtasks" in args: params["include_subtasks"] = args["include_subtasks"] if "include_closed" in args: params["include_closed"] = args["include_closed"] task = self.client.get_task(task_id) # If this is the first time we see this list, cache custom fields if "list" in task and "id" in task["list"]: list_id = task["list"]["id"] await self._cache_custom_fields_for_list(list_id) return task async def _update_task(self, args: Dict[str, Any]) -> Dict[str, Any]: """Update a task with dynamic field handling""" task_id = args["task_id"] task_data = {} # Handle standard fields standard_fields = ["name", "status", "priority", "due_date", "assignees"] for field in standard_fields: if field in args: task_data[field] = args[field] # Handle description fields - only use one if "markdown_content" in args: task_data["markdown_content"] = args["markdown_content"] elif "description" in args: task_data["description"] = args["description"] # Handle custom fields custom_fields = [] for key, value in args.items(): if key.startswith("custom_field_"): field_id = key.replace("custom_field_", "") custom_fields.append({ "id": field_id, "value": value }) if custom_fields: task_data["custom_fields"] = custom_fields return self.client.update_task(task_id, task_data) async def _delete_task(self, args: Dict[str, Any]) -> Dict[str, Any]: """Delete a task""" return self.client.delete_task(args["task_id"]) async def _list_tasks(self, args: Dict[str, Any]) -> Dict[str, Any]: """List tasks with filtering""" list_id = args["list_id"] params = {} optional_params = [ "archived", "page", "order_by", "reverse", "subtasks", "statuses", "include_closed", "assignees", "tags", "due_date_gt", "due_date_lt", "date_created_gt", "date_created_lt", "date_updated_gt", "date_updated_lt" ] for param in optional_params: if param in args: params[param] = args[param] tasks = self.client.get_tasks(list_id, **params) return {"tasks": tasks, "count": len(tasks)} async def _search_tasks(self, args: Dict[str, Any]) -> Dict[str, Any]: """Search tasks in workspace""" workspace_id = args["workspace_id"] query = args["query"] params = {} optional_params = [ "page", "order_by", "reverse", "subtasks", "space_ids", "list_ids", "statuses", "assignees", "tags" ] for param in optional_params: if param in args: params[param] = args[param] tasks = self.client.search_tasks(workspace_id, query, **params) return {"tasks": tasks, "count": len(tasks), "query": query} async def _cache_custom_fields_for_list(self, list_id: str): """Cache custom fields for a list to enable dynamic schema generation""" if list_id in self._custom_fields_cache: return try: # Get list details which should include custom fields list_details = self.client._make_request("GET", f"list/{list_id}") custom_fields = list_details.get("custom_fields", []) self._custom_fields_cache[list_id] = custom_fields except Exception: # If we can't get custom fields, cache empty list self._custom_fields_cache[list_id] = [] def get_enhanced_schema_for_list(self, list_id: str, base_schema_method) -> Dict[str, Any]: """Get enhanced schema with custom fields for a specific list""" base_schema = base_schema_method() if list_id in self._custom_fields_cache: custom_fields = self._custom_fields_cache[list_id] return self.schema_builder.add_custom_fields(base_schema, custom_fields) return base_schema

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yihaoWang/clickup-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server