Skip to main content
Glama
tools.py54.1 kB
"""MCP tool implementations for ClickUp.""" import json import logging from datetime import datetime, timedelta from typing import Any, Callable, Dict, List, Optional from mcp.types import Tool from .client import ClickUpAPIError, ClickUpClient from .models import CreateTaskRequest, Task, UpdateTaskRequest from .utils import format_task_url, parse_duration, parse_task_id logger = logging.getLogger(__name__) class ClickUpTools: """MCP tools for ClickUp operations.""" def __init__(self, client: ClickUpClient) -> None: """Initialize tools with ClickUp client.""" self.client = client self._tools: Dict[str, Callable] = { "create_task": self.create_task, "get_task": self.get_task, "update_task": self.update_task, "delete_task": self.delete_task, "list_tasks": self.list_tasks, "search_tasks": self.search_tasks, "get_subtasks": self.get_subtasks, "get_task_comments": self.get_task_comments, "create_task_comment": self.create_task_comment, "get_task_status": self.get_task_status, "update_task_status": self.update_task_status, "get_assignees": self.get_assignees, "assign_task": self.assign_task, "list_spaces": self.list_spaces, "list_folders": self.list_folders, "list_lists": self.list_lists, "find_list_by_name": self.find_list_by_name, # Bulk operations "bulk_update_tasks": self.bulk_update_tasks, "bulk_move_tasks": self.bulk_move_tasks, # Time tracking "get_time_tracked": self.get_time_tracked, "log_time": self.log_time, # Templates "create_task_from_template": self.create_task_from_template, "create_task_chain": self.create_task_chain, # Analytics "get_team_workload": self.get_team_workload, "get_task_analytics": self.get_task_analytics, # User management "list_users": self.list_users, "get_current_user": self.get_current_user, "find_user_by_name": self.find_user_by_name, } def get_tool_definitions(self) -> List[Tool]: """Get all tool definitions for MCP.""" return [ Tool( name="create_task", description="Create a new task in a specific list", inputSchema={ "type": "object", "properties": { "title": {"type": "string", "description": "Task title"}, "description": {"type": "string", "description": "Task description"}, "list_name": { "type": "string", "description": "Name of the list to create task in", }, "list_id": { "type": "string", "description": "ID of the list (alternative to list_name)", }, "assignees": { "type": "array", "items": {"type": "integer"}, "description": "User IDs to assign", }, "priority": { "type": "integer", "description": "Priority (1=urgent, 2=high, 3=normal, 4=low)", }, "due_date": {"type": "string", "description": "Due date (ISO 8601 format)"}, "time_estimate": { "type": "string", "description": "Time estimate (e.g., '2h 30m')", }, }, "required": ["title"], }, ), Tool( name="get_task", description="Get task details by ID (supports various ID formats including project codes)", inputSchema={ "type": "object", "properties": { "task_id": { "type": "string", "description": "Task ID, custom ID (e.g., gh-123), or URL", }, "include_subtasks": { "type": "boolean", "description": "Include subtasks in response", }, }, "required": ["task_id"], }, ), Tool( name="update_task", description="Update task properties", inputSchema={ "type": "object", "properties": { "task_id": {"type": "string", "description": "Task ID"}, "title": {"type": "string", "description": "New title"}, "description": {"type": "string", "description": "New description"}, "status": {"type": "string", "description": "New status"}, "priority": {"type": "integer", "description": "New priority"}, "due_date": {"type": "string", "description": "New due date"}, "assignees_add": { "type": "array", "items": {"type": "integer"}, "description": "User IDs to add as assignees", }, "assignees_remove": { "type": "array", "items": {"type": "integer"}, "description": "User IDs to remove as assignees", }, }, "required": ["task_id"], }, ), Tool( name="delete_task", description="Delete a task", inputSchema={ "type": "object", "properties": { "task_id": {"type": "string", "description": "Task ID to delete"}, }, "required": ["task_id"], }, ), Tool( name="list_tasks", description="List tasks in a list, folder, or space", inputSchema={ "type": "object", "properties": { "list_id": {"type": "string", "description": "List ID"}, "folder_id": {"type": "string", "description": "Folder ID"}, "space_id": {"type": "string", "description": "Space ID"}, "statuses": { "type": "array", "items": {"type": "string"}, "description": "Filter by statuses", }, "assignees": { "type": "array", "items": {"type": "integer"}, "description": "Filter by assignee IDs", }, "include_closed": { "type": "boolean", "description": "Include closed tasks", }, }, }, ), Tool( name="search_tasks", description="Search tasks across workspace", inputSchema={ "type": "object", "properties": { "query": {"type": "string", "description": "Search query"}, "statuses": { "type": "array", "items": {"type": "string"}, "description": "Filter by statuses", }, "assignees": { "type": "array", "items": {"type": "integer"}, "description": "Filter by assignee IDs", }, }, }, ), Tool( name="get_subtasks", description="Get all subtasks of a parent task", inputSchema={ "type": "object", "properties": { "parent_task_id": {"type": "string", "description": "Parent task ID"}, }, "required": ["parent_task_id"], }, ), Tool( name="get_task_comments", description="Get comments on a task", inputSchema={ "type": "object", "properties": { "task_id": {"type": "string", "description": "Task ID"}, }, "required": ["task_id"], }, ), Tool( name="create_task_comment", description="Create a comment on a task", inputSchema={ "type": "object", "properties": { "task_id": {"type": "string", "description": "Task ID"}, "comment_text": {"type": "string", "description": "Comment text"}, "assignee": { "type": "integer", "description": "User ID to assign (optional)", }, "notify_all": { "type": "boolean", "description": "Notify all assignees (default: true)", }, }, "required": ["task_id", "comment_text"], }, ), Tool( name="get_task_status", description="Get current status of a task", inputSchema={ "type": "object", "properties": { "task_id": {"type": "string", "description": "Task ID"}, }, "required": ["task_id"], }, ), Tool( name="update_task_status", description="Update task status", inputSchema={ "type": "object", "properties": { "task_id": {"type": "string", "description": "Task ID"}, "status": {"type": "string", "description": "New status"}, }, "required": ["task_id", "status"], }, ), Tool( name="get_assignees", description="Get assignees of a task", inputSchema={ "type": "object", "properties": { "task_id": {"type": "string", "description": "Task ID"}, }, "required": ["task_id"], }, ), Tool( name="assign_task", description="Assign users to a task", inputSchema={ "type": "object", "properties": { "task_id": {"type": "string", "description": "Task ID"}, "user_ids": { "type": "array", "items": {"type": "integer"}, "description": "User IDs to assign", }, }, "required": ["task_id", "user_ids"], }, ), Tool( name="list_spaces", description="List all spaces in workspace", inputSchema={ "type": "object", "properties": {}, }, ), Tool( name="list_folders", description="List folders in a space", inputSchema={ "type": "object", "properties": { "space_id": {"type": "string", "description": "Space ID"}, }, "required": ["space_id"], }, ), Tool( name="list_lists", description="List all lists in a folder or space", inputSchema={ "type": "object", "properties": { "folder_id": {"type": "string", "description": "Folder ID"}, "space_id": {"type": "string", "description": "Space ID"}, }, }, ), Tool( name="find_list_by_name", description="Find a list by name", inputSchema={ "type": "object", "properties": { "name": {"type": "string", "description": "List name to search for"}, "space_id": {"type": "string", "description": "Space ID to search in"}, }, "required": ["name"], }, ), # Bulk operations Tool( name="bulk_update_tasks", description="Update multiple tasks at once", inputSchema={ "type": "object", "properties": { "task_ids": { "type": "array", "items": {"type": "string"}, "description": "List of task IDs to update", }, "updates": { "type": "object", "description": "Updates to apply (status, priority, assignees, etc.)", }, }, "required": ["task_ids", "updates"], }, ), Tool( name="bulk_move_tasks", description="Move multiple tasks to a different list", inputSchema={ "type": "object", "properties": { "task_ids": { "type": "array", "items": {"type": "string"}, "description": "List of task IDs to move", }, "target_list_id": {"type": "string", "description": "Target list ID"}, }, "required": ["task_ids", "target_list_id"], }, ), # Time tracking Tool( name="get_time_tracked", description="Get time tracked for tasks", inputSchema={ "type": "object", "properties": { "user_id": {"type": "integer", "description": "User ID"}, "start_date": {"type": "string", "description": "Start date (ISO 8601)"}, "end_date": {"type": "string", "description": "End date (ISO 8601)"}, }, }, ), Tool( name="log_time", description="Log time spent on a task", inputSchema={ "type": "object", "properties": { "task_id": {"type": "string", "description": "Task ID"}, "duration": {"type": "string", "description": "Duration (e.g., '2h 30m')"}, "description": {"type": "string", "description": "Optional description"}, }, "required": ["task_id", "duration"], }, ), # Templates Tool( name="create_task_from_template", description="Create a task from a template", inputSchema={ "type": "object", "properties": { "template_name": {"type": "string", "description": "Template name"}, "customizations": { "type": "object", "description": "Customizations to apply to template", }, "list_id": {"type": "string", "description": "List ID for the new task"}, }, "required": ["template_name", "list_id"], }, ), Tool( name="create_task_chain", description="Create a chain of dependent tasks", inputSchema={ "type": "object", "properties": { "tasks": { "type": "array", "items": { "type": "object", "properties": { "title": {"type": "string"}, "description": {"type": "string"}, "time_estimate": {"type": "string"}, }, }, "description": "List of tasks to create in sequence", }, "list_id": {"type": "string", "description": "List ID for the tasks"}, "auto_link": { "type": "boolean", "description": "Automatically link tasks as dependencies", }, }, "required": ["tasks", "list_id"], }, ), # Analytics Tool( name="get_team_workload", description="Get workload distribution across team members", inputSchema={ "type": "object", "properties": { "space_id": {"type": "string", "description": "Space ID"}, "include_completed": { "type": "boolean", "description": "Include completed tasks in analysis", }, }, "required": ["space_id"], }, ), Tool( name="get_task_analytics", description="Get analytics for tasks (velocity, completion rate, etc.)", inputSchema={ "type": "object", "properties": { "space_id": {"type": "string", "description": "Space ID"}, "period_days": { "type": "integer", "description": "Period in days to analyze", }, }, "required": ["space_id"], }, ), # User management tools Tool( name="list_users", description="List all users in the workspace", inputSchema={ "type": "object", "properties": { "workspace_id": { "type": "string", "description": "Workspace ID (optional, uses default if not provided)", }, }, }, ), Tool( name="get_current_user", description="Get details of the currently authenticated user", inputSchema={ "type": "object", "properties": {}, }, ), Tool( name="find_user_by_name", description="Find a user by name or email", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "Name or email to search for", }, "workspace_id": { "type": "string", "description": "Workspace ID (optional, uses default if not provided)", }, }, "required": ["name"], }, ), ] async def call_tool(self, name: str, arguments: Dict[str, Any]) -> str: """Call a tool by name with arguments.""" if name not in self._tools: raise ValueError(f"Unknown tool: {name}") try: result = await self._tools[name](**arguments) return json.dumps(result, indent=2, default=str) except ClickUpAPIError as e: logger.error(f"ClickUp API error in {name}: {e}") return json.dumps({"error": str(e), "type": "api_error"}) except Exception as e: logger.error(f"Error in {name}: {e}", exc_info=True) return json.dumps({"error": str(e), "type": "internal_error"}) # Tool implementations async def _resolve_task_id(self, task_id: str, include_subtasks: bool = False) -> Task: """Smart task ID resolution that handles both internal and custom IDs.""" # Parse task ID to determine if it might be a custom ID parsed_id, custom_type = parse_task_id(task_id, self.client.config.id_patterns) # Try direct lookup first (works for both internal and custom IDs) try: return await self.client.get_task(parsed_id, include_subtasks=include_subtasks) except ClickUpAPIError as direct_error: # If it might be a custom ID, try with custom_task_ids=true if custom_type or "-" in parsed_id: try: team_id = ( self.client.config.default_team_id or self.client.config.default_workspace_id ) return await self.client.get_task( parsed_id, include_subtasks=include_subtasks, custom_task_ids=True, team_id=team_id, ) except ClickUpAPIError as custom_error: # If both fail, try search as final fallback try: tasks = await self.client.search_tasks(query=task_id) if not tasks: raise ClickUpAPIError(f"Task '{task_id}' not found") # Find exact match by custom_id or use first result for task in tasks: if hasattr(task, "custom_id") and task.custom_id == task_id: return task return tasks[0] except ClickUpAPIError: # Re-raise the most relevant error raise (custom_error if custom_type else direct_error) from None else: # Not a custom ID pattern, re-raise the original error raise direct_error async def create_task( self, title: str, description: Optional[str] = None, list_name: Optional[str] = None, list_id: Optional[str] = None, assignees: Optional[List[int]] = None, priority: Optional[int] = None, due_date: Optional[str] = None, time_estimate: Optional[str] = None, **kwargs: Any, ) -> Dict[str, Any]: """Create a new task.""" # Find list ID if name provided if not list_id and not list_name: raise ValueError("Either list_id or list_name must be provided") if not list_id: lst = await self.client.find_list_by_name(list_name) if not lst: return {"error": f"List '{list_name}' not found"} list_id = lst.id # Build task request task_request = CreateTaskRequest(name=title) if description: task_request.description = description if assignees: task_request.assignees = assignees if priority: task_request.priority = priority if due_date: # Parse ISO date to unix timestamp dt = datetime.fromisoformat(due_date.replace("Z", "+00:00")) task_request.due_date = int(dt.timestamp() * 1000) task_request.due_date_time = True if time_estimate: task_request.time_estimate = parse_duration(time_estimate) # Create task task = await self.client.create_task(list_id, task_request) result = { "id": task.id, "name": task.name, "status": task.status.status, "url": format_task_url(task.id), "list": task.list.get("name", "Unknown"), "created": True, } if task.custom_id: result["custom_id"] = task.custom_id return result async def get_task( self, task_id: str, include_subtasks: bool = False, ) -> Dict[str, Any]: """Get task details.""" try: task = await self._resolve_task_id(task_id, include_subtasks) except ClickUpAPIError as e: return {"error": f"Failed to get task '{task_id}': {e!s}"} result = { "id": task.id, "name": task.name, "description": task.description or "", "status": task.status.status, "priority": task.priority.value if task.priority else None, "assignees": [{"id": u.id, "username": u.username} for u in task.assignees], "creator": {"id": task.creator.id, "username": task.creator.username}, "list": task.list.get("name", "Unknown"), "space": task.space.get("name", "Unknown"), "url": format_task_url(task.id), "tags": task.tags, } if task.custom_id: result["custom_id"] = task.custom_id if task.due_date: result["due_date"] = task.due_date.isoformat() if task.time_estimate: result["time_estimate"] = task.time_estimate // 1000 // 60 # Convert to minutes if task.time_spent: result["time_spent"] = task.time_spent // 1000 // 60 # Convert to minutes if include_subtasks and task.parent: # Fetch subtasks subtasks = await self.get_subtasks(task.parent) result["subtasks"] = subtasks return result async def update_task( self, task_id: str, title: Optional[str] = None, description: Optional[str] = None, status: Optional[str] = None, priority: Optional[int] = None, due_date: Optional[str] = None, assignees_add: Optional[List[int]] = None, assignees_remove: Optional[List[int]] = None, **kwargs: Any, ) -> Dict[str, Any]: """Update task properties.""" try: # First resolve the task to get the internal ID resolved_task = await self._resolve_task_id(task_id) parsed_id = resolved_task.id except ClickUpAPIError as e: return {"error": f"Failed to resolve task '{task_id}': {e!s}"} update_request = UpdateTaskRequest() if title: update_request.name = title if description is not None: update_request.description = description if status: update_request.status = status if priority: update_request.priority = priority if due_date: dt = datetime.fromisoformat(due_date.replace("Z", "+00:00")) update_request.due_date = int(dt.timestamp() * 1000) update_request.due_date_time = True if assignees_add or assignees_remove: update_request.assignees = {} if assignees_add: update_request.assignees["add"] = assignees_add if assignees_remove: update_request.assignees["rem"] = assignees_remove task = await self.client.update_task(parsed_id, update_request) return { "id": task.id, "name": task.name, "status": task.status.status, "updated": True, } async def delete_task(self, task_id: str) -> Dict[str, Any]: """Delete a task.""" try: # First resolve the task to get the internal ID task = await self._resolve_task_id(task_id) await self.client.delete_task(task.id) return {"id": task.id, "deleted": True} except ClickUpAPIError as e: return {"error": f"Failed to delete task '{task_id}': {e!s}"} async def list_tasks( self, list_id: Optional[str] = None, folder_id: Optional[str] = None, space_id: Optional[str] = None, statuses: Optional[List[str]] = None, assignees: Optional[List[int]] = None, include_closed: bool = False, **kwargs: Any, ) -> Dict[str, Any]: """List tasks with filters.""" tasks = await self.client.get_tasks( list_id=list_id, folder_id=folder_id, space_id=space_id, statuses=statuses, assignees=assignees, include_closed=include_closed, ) return { "tasks": [ { "id": task.id, "name": task.name, "status": task.status.status, "assignees": [u.username for u in task.assignees], "url": format_task_url(task.id), } for task in tasks ], "count": len(tasks), } async def search_tasks( self, query: Optional[str] = None, statuses: Optional[List[str]] = None, assignees: Optional[List[int]] = None, **kwargs: Any, ) -> Dict[str, Any]: """Search tasks.""" tasks = await self.client.search_tasks( query=query, statuses=statuses, assignees=assignees, ) return { "tasks": [ { "id": task.id, "name": task.name, "status": task.status.status, "list": task.list.get("name", "Unknown"), "space": task.space.get("name", "Unknown"), "url": format_task_url(task.id), } for task in tasks ], "count": len(tasks), } async def get_subtasks(self, parent_task_id: str) -> Dict[str, Any]: """Get subtasks of a parent task.""" try: # First resolve the task to get the internal ID task = await self._resolve_task_id(parent_task_id) # Use the client's proper subtasks method subtasks = await self.client.get_subtasks(task.id) return { "parent_id": task.id, "subtasks": [ { "id": task.id, "name": task.name, "status": task.status.status, "assignees": [u.username for u in task.assignees], "url": format_task_url(task.id), "custom_id": task.custom_id if task.custom_id else None, } for task in subtasks ], "count": len(subtasks), } except Exception as e: return {"error": f"Failed to get subtasks for '{parent_task_id}': {e!s}"} async def get_task_comments(self, task_id: str) -> Dict[str, Any]: """Get task comments.""" try: # First resolve the task to get the internal ID task = await self._resolve_task_id(task_id) comments = await self.client.get_task_comments(task.id) except ClickUpAPIError as e: return {"error": f"Failed to get comments for task '{task_id}': {e!s}"} return { "task_id": task.id, "comments": [ { "id": comment["id"], "text": comment["comment_text"], "user": comment["user"]["username"], "date": comment["date"], } for comment in comments ], "count": len(comments), } async def create_task_comment( self, task_id: str, comment_text: str, assignee: Optional[int] = None, notify_all: bool = True, ) -> Dict[str, Any]: """Create a comment on a task.""" try: # First resolve the task to get the internal ID task = await self._resolve_task_id(task_id) comment_data = await self.client.create_task_comment( task.id, comment_text, assignee, notify_all ) except ClickUpAPIError as e: return {"error": f"Failed to create comment on task '{task_id}': {e!s}"} return { "task_id": task.id, "comment_id": comment_data.get("id"), "comment_text": comment_text, "created": True, "notify_all": notify_all, } async def get_task_status(self, task_id: str) -> Dict[str, Any]: """Get task status.""" try: task = await self._resolve_task_id(task_id) except ClickUpAPIError as e: return {"error": f"Failed to get task status for '{task_id}': {e!s}"} return { "task_id": task.id, "status": task.status.status, "status_color": task.status.color, "status_type": task.status.type, } async def update_task_status(self, task_id: str, status: str) -> Dict[str, Any]: """Update task status.""" try: # First resolve the task to get the internal ID resolved_task = await self._resolve_task_id(task_id) update_request = UpdateTaskRequest(status=status) task = await self.client.update_task(resolved_task.id, update_request) except ClickUpAPIError as e: return {"error": f"Failed to update task status for '{task_id}': {e!s}"} return { "task_id": task.id, "status": task.status.status, "updated": True, } async def get_assignees(self, task_id: str) -> Dict[str, Any]: """Get task assignees.""" try: task = await self._resolve_task_id(task_id) except ClickUpAPIError as e: return {"error": f"Failed to get assignees for task '{task_id}': {e!s}"} return { "task_id": task.id, "assignees": [ { "id": user.id, "username": user.username, "email": user.email, "initials": user.initials, } for user in task.assignees ], "count": len(task.assignees), } async def assign_task(self, task_id: str, user_ids: List[int]) -> Dict[str, Any]: """Assign users to task.""" try: # First resolve the task to get the internal ID resolved_task = await self._resolve_task_id(task_id) update_request = UpdateTaskRequest(assignees={"add": user_ids}) task = await self.client.update_task(resolved_task.id, update_request) except ClickUpAPIError as e: return {"error": f"Failed to assign users to task '{task_id}': {e!s}"} return { "task_id": task.id, "assignees": [u.username for u in task.assignees], "updated": True, } async def list_spaces(self) -> Dict[str, Any]: """List all spaces.""" spaces = await self.client.get_spaces() return { "spaces": [ { "id": space.id, "name": space.name, "private": space.private, "color": space.color, } for space in spaces ], "count": len(spaces), } async def list_folders(self, space_id: str) -> Dict[str, Any]: """List folders in space.""" folders = await self.client.get_folders(space_id) return { "space_id": space_id, "folders": [ { "id": folder.id, "name": folder.name, "task_count": folder.task_count, "lists": [ { "id": lst.id, "name": lst.name, "task_count": lst.task_count, } for lst in folder.lists ], } for folder in folders ], "count": len(folders), } async def list_lists( self, folder_id: Optional[str] = None, space_id: Optional[str] = None, ) -> Dict[str, Any]: """List all lists.""" if not folder_id and not space_id: # List all lists from all spaces spaces = await self.client.get_spaces() all_lists = [] for space in spaces: lists = await self.client.get_lists(space_id=space.id) all_lists.extend(lists) else: all_lists = await self.client.get_lists(folder_id=folder_id, space_id=space_id) return { "lists": [ { "id": lst.id, "name": lst.name, "space": lst.space.get("name", "Unknown"), "folder": lst.folder.get("name") if lst.folder else None, } for lst in all_lists ], "count": len(all_lists), } async def find_list_by_name(self, name: str, space_id: Optional[str] = None) -> Dict[str, Any]: """Find list by name.""" lst = await self.client.find_list_by_name(name, space_id) if not lst: return {"error": f"List '{name}' not found"} return { "id": lst.id, "name": lst.name, "space": lst.space.get("name", "Unknown"), "folder": lst.folder.get("name") if lst.folder else None, "found": True, } # Bulk operations async def bulk_update_tasks( self, task_ids: List[str], updates: Dict[str, Any] ) -> Dict[str, Any]: """Update multiple tasks at once.""" results = {"updated": [], "failed": []} # Convert updates dict to UpdateTaskRequest format update_request = UpdateTaskRequest() if "status" in updates: update_request.status = updates["status"] if "priority" in updates: update_request.priority = updates["priority"] if "assignees_add" in updates: update_request.assignees = {"add": updates["assignees_add"]} if "assignees_remove" in updates: if update_request.assignees is None: update_request.assignees = {} update_request.assignees["rem"] = updates["assignees_remove"] for task_id in task_ids: try: # Resolve each task ID to get the internal ID resolved_task = await self._resolve_task_id(task_id) await self.client.update_task(resolved_task.id, update_request) results["updated"].append(task_id) except Exception as e: results["failed"].append({"task_id": task_id, "error": str(e)}) return results async def bulk_move_tasks(self, task_ids: List[str], target_list_id: str) -> Dict[str, Any]: """Move multiple tasks to a different list.""" results = {"moved": [], "failed": []} for task_id in task_ids: try: # Resolve each task ID to get the internal ID resolved_task = await self._resolve_task_id(task_id) # Moving tasks requires updating the list property await self.client._request( "PUT", f"/task/{resolved_task.id}", json={"list": target_list_id} ) results["moved"].append(task_id) except Exception as e: results["failed"].append({"task_id": task_id, "error": str(e)}) return results # Time tracking async def get_time_tracked( self, user_id: Optional[int] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, ) -> Dict[str, Any]: """Get time tracked for tasks.""" # If no dates provided, default to last 7 days if not start_date: start_date = (datetime.now() - timedelta(days=7)).isoformat() if not end_date: end_date = datetime.now().isoformat() start_ts = int(datetime.fromisoformat(start_date.replace("Z", "+00:00")).timestamp() * 1000) end_ts = int(datetime.fromisoformat(end_date.replace("Z", "+00:00")).timestamp() * 1000) # Get time entries params = { "start_date": str(start_ts), "end_date": str(end_ts), } if user_id: params["assignee"] = str(user_id) workspace_id = self.client.config.default_workspace_id if not workspace_id: workspaces = await self.client.get_workspaces() workspace_id = workspaces[0].id data = await self.client._request( "GET", f"/team/{workspace_id}/time_entries", params=params ) total_ms = sum(entry.get("duration", 0) for entry in data.get("data", [])) total_hours = total_ms / (1000 * 60 * 60) return { "total_milliseconds": total_ms, "total_hours": round(total_hours, 2), "entries": len(data.get("data", [])), "period": { "start": start_date, "end": end_date, }, } async def log_time( self, task_id: str, duration: str, description: Optional[str] = None, ) -> Dict[str, Any]: """Log time spent on a task.""" try: # First resolve the task to get the internal ID resolved_task = await self._resolve_task_id(task_id) duration_ms = parse_duration(duration) except ClickUpAPIError as e: return {"error": f"Failed to resolve task '{task_id}': {e!s}"} # Create time entry payload = { "duration": duration_ms, "task_id": resolved_task.id, } if description: payload["description"] = description workspace_id = self.client.config.default_workspace_id if not workspace_id: workspaces = await self.client.get_workspaces() workspace_id = workspaces[0].id await self.client._request("POST", f"/team/{workspace_id}/time_entries", json=payload) return { "logged": True, "duration": duration, "duration_ms": duration_ms, "task_id": resolved_task.id, } # Templates and chains async def create_task_from_template( self, template_name: str, list_id: str, customizations: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Create a task from a template.""" # Define common templates templates = { "bug_report": { "name": "Bug Report: {title}", "description": "## Description\n\n## Steps to Reproduce\n1. \n2. \n3. \n\n## Expected Behavior\n\n## Actual Behavior\n\n## Environment\n", "priority": 2, "tags": ["bug"], }, "feature_request": { "name": "Feature: {title}", "description": "## Feature Description\n\n## User Story\nAs a [user type], I want [goal] so that [benefit].\n\n## Acceptance Criteria\n- [ ] \n- [ ] \n\n## Technical Notes\n", "priority": 3, "tags": ["feature"], }, "code_review": { "name": "Code Review: {title}", "description": "## PR Link\n\n## Changes Summary\n\n## Checklist\n- [ ] Code follows style guidelines\n- [ ] Tests added/updated\n- [ ] Documentation updated\n- [ ] No console errors\n", "priority": 2, "tags": ["review"], }, } template = templates.get(template_name) if not template: return {"error": f"Template '{template_name}' not found"} # Apply customizations task_data = template.copy() if customizations: task_data["name"] = task_data["name"].format(**customizations) task_data.update(customizations) # Create task task_request = CreateTaskRequest( name=task_data["name"], description=task_data.get("description"), priority=task_data.get("priority"), tags=task_data.get("tags"), ) task = await self.client.create_task(list_id, task_request) return { "id": task.id, "name": task.name, "template": template_name, "url": format_task_url(task.id), } async def create_task_chain( self, tasks: List[Dict[str, Any]], list_id: str, auto_link: bool = True, ) -> Dict[str, Any]: """Create a chain of dependent tasks.""" created_tasks = [] for i, task_data in enumerate(tasks): # Create task task_request = CreateTaskRequest( name=task_data["title"], description=task_data.get("description"), ) # Parse time estimate if provided if "time_estimate" in task_data: task_request.time_estimate = parse_duration(task_data["time_estimate"]) # Link to previous task if auto_link is enabled if auto_link and i > 0 and created_tasks: task_request.links_to = created_tasks[-1]["id"] task = await self.client.create_task(list_id, task_request) created_tasks.append( { "id": task.id, "name": task.name, "url": format_task_url(task.id), } ) return { "created": len(created_tasks), "tasks": created_tasks, "linked": auto_link, } # Analytics async def get_team_workload( self, space_id: str, include_completed: bool = False ) -> Dict[str, Any]: """Get workload distribution across team members.""" # Get all tasks in the space tasks = await self.client.get_tasks( space_id=space_id, include_closed=include_completed, ) # Group by assignee workload = {} unassigned_count = 0 for task in tasks: if not task.assignees: unassigned_count += 1 else: for assignee in task.assignees: username = assignee.username if username not in workload: workload[username] = { "user_id": assignee.id, "username": username, "task_count": 0, "total_time_estimate": 0, "by_priority": {1: 0, 2: 0, 3: 0, 4: 0}, } workload[username]["task_count"] += 1 if task.time_estimate: workload[username]["total_time_estimate"] += task.time_estimate if task.priority: workload[username]["by_priority"][task.priority.value] += 1 # Convert time estimates to hours for user_data in workload.values(): user_data["total_hours_estimate"] = round( user_data["total_time_estimate"] / (1000 * 60 * 60), 2 ) return { "space_id": space_id, "team_workload": list(workload.values()), "unassigned_tasks": unassigned_count, "total_tasks": len(tasks), } async def get_task_analytics(self, space_id: str, period_days: int = 30) -> Dict[str, Any]: """Get analytics for tasks.""" # Calculate date range end_date = datetime.now() start_date = end_date - timedelta(days=period_days) # Get tasks created in period tasks = await self.client.search_tasks( query="", date_created_gt=int(start_date.timestamp() * 1000), date_created_lt=int(end_date.timestamp() * 1000), ) # Calculate metrics total_tasks = len(tasks) completed_tasks = sum(1 for task in tasks if task.status.type == "closed") # Calculate average completion time completion_times = [] for task in tasks: if task.date_closed and task.date_created: time_to_complete = (task.date_closed - task.date_created).total_seconds() / 3600 completion_times.append(time_to_complete) avg_completion_time = ( sum(completion_times) / len(completion_times) if completion_times else 0 ) # Tasks by priority by_priority = {1: 0, 2: 0, 3: 0, 4: 0} for task in tasks: if task.priority: by_priority[task.priority.value] += 1 return { "space_id": space_id, "period_days": period_days, "metrics": { "total_tasks_created": total_tasks, "completed_tasks": completed_tasks, "completion_rate": round( completed_tasks / total_tasks * 100 if total_tasks > 0 else 0, 2 ), "avg_completion_hours": round(avg_completion_time, 2), "tasks_per_day": round(total_tasks / period_days, 2), }, "by_priority": by_priority, } # User management async def list_users(self, workspace_id: Optional[str] = None) -> Dict[str, Any]: """List all users in the workspace.""" members = await self.client.get_workspace_members(workspace_id) return { "users": [ { "id": member.get("id"), "username": member.get("username"), "email": member.get("email"), "initials": member.get("initials"), "color": member.get("color"), "profilePicture": member.get("profilePicture"), } for member in members ], "count": len(members), } async def get_current_user(self) -> Dict[str, Any]: """Get details of the currently authenticated user.""" user = await self.client.get_current_user() return { "id": user.get("id"), "username": user.get("username"), "email": user.get("email"), "initials": user.get("initials"), "color": user.get("color"), "profilePicture": user.get("profilePicture"), "role": user.get("role"), } async def find_user_by_name( self, name: str, workspace_id: Optional[str] = None ) -> Dict[str, Any]: """Find a user by name or email.""" members = await self.client.get_workspace_members(workspace_id) # Search by username or email (case-insensitive) name_lower = name.lower() matches = [] for member in members: username = member.get("username", "").lower() email = member.get("email", "").lower() if name_lower in username or name_lower in email: matches.append( { "id": member.get("id"), "username": member.get("username"), "email": member.get("email"), "initials": member.get("initials"), "color": member.get("color"), "profilePicture": member.get("profilePicture"), } ) if not matches: return {"error": f"No user found matching '{name}'", "matches": []} return { "matches": matches, "count": len(matches), "found": True, }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DiversioTeam/clickup-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server