Skip to main content
Glama
client.py15.9 kB
"""ClickUp API client implementation.""" import logging from typing import Any, Dict, List, Optional import httpx from .config import Config from .models import CreateTaskRequest, Folder, Space, Task, UpdateTaskRequest, Workspace from .models import List as ClickUpList logger = logging.getLogger(__name__) class ClickUpAPIError(Exception): """ClickUp API error.""" def __init__(self, message: str, status_code: Optional[int] = None) -> None: super().__init__(message) self.status_code = status_code class ClickUpClient: """Client for interacting with ClickUp API.""" BASE_URL = "https://api.clickup.com/api/v2" def __init__(self, config: Config) -> None: """Initialize the client with configuration.""" self.config = config self.client = httpx.AsyncClient( base_url=self.BASE_URL, headers=config.headers, timeout=30.0, ) async def __aenter__(self) -> "ClickUpClient": """Async context manager entry.""" return self async def __aexit__(self, *args: Any) -> None: """Async context manager exit.""" await self.close() async def close(self) -> None: """Close the HTTP client.""" await self.client.aclose() async def _request( self, method: str, path: str, **kwargs: Any, ) -> Dict[str, Any]: """Make an API request and handle errors.""" try: response = await self.client.request(method, path, **kwargs) if response.status_code >= 400: error_msg = f"API error: {response.status_code}" try: error_data = response.json() if "err" in error_data: error_msg = f"{error_msg} - {error_data['err']}" except Exception: error_msg = f"{error_msg} - {response.text}" raise ClickUpAPIError(error_msg, response.status_code) return response.json() except httpx.TimeoutException as e: raise ClickUpAPIError("Request timed out") from e except httpx.RequestError as e: raise ClickUpAPIError(f"Request failed: {e!s}") from e # User endpoints async def get_current_user(self) -> Dict[str, Any]: """Get the current authenticated user.""" data = await self._request("GET", "/user") return data.get("user", {}) async def get_workspace_members( self, workspace_id: Optional[str] = None ) -> List[Dict[str, Any]]: """Get all members of a workspace. Note: This endpoint requires the workspace to be on an Enterprise plan for full functionality. For non-Enterprise plans, it may return limited data. """ workspace_id = workspace_id or self.config.default_workspace_id if not workspace_id: workspaces = await self.get_workspaces() if not workspaces: raise ClickUpAPIError("No workspaces found") workspace_id = workspaces[0].id # First try the groups endpoint which is more widely available try: data = await self._request("GET", "/group") groups = data.get("groups", []) # Extract unique members from all groups members_dict = {} for group in groups: for member in group.get("members", []): user_id = member.get("id") if user_id and user_id not in members_dict: # Ensure consistent user format user_data = { "id": member.get("id"), "username": member.get("username"), "email": member.get("email"), "initials": member.get("initials"), "color": member.get("color"), "profilePicture": member.get("profilePicture"), } members_dict[user_id] = user_data if members_dict: return list(members_dict.values()) except ClickUpAPIError: logger.warning("Groups endpoint failed, trying other endpoints") # Try the team endpoint which might have member information try: data = await self._request("GET", f"/team/{workspace_id}") team = data.get("team", {}) members = team.get("members", []) # Format members consistently formatted_members = [] for member in members: if isinstance(member, dict): # Handle nested user structure if "user" in member: user_data = member["user"] else: user_data = member formatted_member = { "id": user_data.get("id"), "username": user_data.get("username"), "email": user_data.get("email"), "initials": user_data.get("initials"), "color": user_data.get("color"), "profilePicture": user_data.get("profilePicture"), } formatted_members.append(formatted_member) if formatted_members: return formatted_members except ClickUpAPIError: logger.warning("Team endpoint also failed") # Final fallback: get current user and return as single-item list try: current_user = await self.get_current_user() return [ { "id": current_user.get("id"), "username": current_user.get("username"), "email": current_user.get("email"), "initials": current_user.get("initials"), "color": current_user.get("color"), "profilePicture": current_user.get("profilePicture"), } ] except ClickUpAPIError: logger.warning("Unable to fetch any workspace members") return [] # Workspace/Team endpoints async def get_workspaces(self) -> List[Workspace]: """Get all workspaces/teams.""" data = await self._request("GET", "/team") teams = data.get("teams", []) return [Workspace(**team) for team in teams] # Space endpoints async def get_spaces(self, workspace_id: Optional[str] = None) -> List[Space]: """Get all spaces in a workspace.""" workspace_id = workspace_id or self.config.default_workspace_id if not workspace_id: workspaces = await self.get_workspaces() if not workspaces: raise ClickUpAPIError("No workspaces found") workspace_id = workspaces[0].id data = await self._request("GET", f"/team/{workspace_id}/space") spaces = data.get("spaces", []) return [Space(**space) for space in spaces] async def get_space(self, space_id: str) -> Space: """Get a specific space.""" data = await self._request("GET", f"/space/{space_id}") return Space(**data) # Folder endpoints async def get_folders(self, space_id: str) -> List[Folder]: """Get all folders in a space.""" data = await self._request("GET", f"/space/{space_id}/folder") folders = data.get("folders", []) return [Folder(**folder) for folder in folders] async def get_folder(self, folder_id: str) -> Folder: """Get a specific folder.""" data = await self._request("GET", f"/folder/{folder_id}") return Folder(**data) # List endpoints async def get_lists( self, folder_id: Optional[str] = None, space_id: Optional[str] = None, ) -> List[ClickUpList]: """Get lists from a folder or space.""" if folder_id: data = await self._request("GET", f"/folder/{folder_id}/list") elif space_id: data = await self._request("GET", f"/space/{space_id}/list") else: raise ValueError("Either folder_id or space_id must be provided") lists = data.get("lists", []) return [ClickUpList(**lst) for lst in lists] async def get_list(self, list_id: str) -> ClickUpList: """Get a specific list.""" data = await self._request("GET", f"/list/{list_id}") return ClickUpList(**data) async def find_list_by_name( self, name: str, space_id: Optional[str] = None, ) -> Optional[ClickUpList]: """Find a list by name in a space.""" if not space_id: # Search in all spaces if not specified spaces = await self.get_spaces() for space in spaces: result = await self.find_list_by_name(name, space.id) if result: return result return None # Get all lists in the space lists = await self.get_lists(space_id=space_id) # Also check folders folders = await self.get_folders(space_id) for folder in folders: folder_lists = await self.get_lists(folder_id=folder.id) lists.extend(folder_lists) # Find by name (case-insensitive) name_lower = name.lower() for lst in lists: if lst.name.lower() == name_lower: return lst return None # Task endpoints async def create_task( self, list_id: str, task: CreateTaskRequest, ) -> Task: """Create a new task.""" data = await self._request( "POST", f"/list/{list_id}/task", json=task.model_dump(exclude_none=True), ) return Task(**data) async def get_task( self, task_id: str, include_subtasks: bool = False, custom_task_ids: bool = False, team_id: Optional[str] = None, ) -> Task: """Get a task by ID.""" params = {} if include_subtasks: params["include_subtasks"] = "true" if custom_task_ids: params["custom_task_ids"] = "true" if team_id: params["team_id"] = team_id data = await self._request("GET", f"/task/{task_id}", params=params) return Task(**data) async def update_task( self, task_id: str, updates: UpdateTaskRequest, ) -> Task: """Update a task.""" data = await self._request( "PUT", f"/task/{task_id}", json=updates.model_dump(exclude_none=True), ) return Task(**data) async def delete_task(self, task_id: str) -> None: """Delete a task.""" await self._request("DELETE", f"/task/{task_id}") async def get_tasks( self, list_id: Optional[str] = None, space_id: Optional[str] = None, folder_id: Optional[str] = None, archived: bool = False, page: int = 0, order_by: str = "created", statuses: Optional[List[str]] = None, assignees: Optional[List[int]] = None, tags: Optional[List[str]] = None, include_closed: bool = False, ) -> List[Task]: """Get tasks with various filters.""" params: Dict[str, Any] = { "archived": str(archived).lower(), "page": str(page), "order_by": order_by, "include_closed": str(include_closed).lower(), } if statuses: params["statuses[]"] = statuses if assignees: params["assignees[]"] = assignees if tags: params["tags[]"] = tags if list_id: path = f"/list/{list_id}/task" elif folder_id: path = f"/folder/{folder_id}/task" elif space_id: path = f"/space/{space_id}/task" else: raise ValueError("One of list_id, folder_id, or space_id must be provided") data = await self._request("GET", path, params=params) tasks = data.get("tasks", []) return [Task(**task) for task in tasks] async def search_tasks( self, workspace_id: Optional[str] = None, query: Optional[str] = None, statuses: Optional[List[str]] = None, assignees: Optional[List[int]] = None, tags: Optional[List[str]] = None, date_created_gt: Optional[int] = None, date_created_lt: Optional[int] = None, date_updated_gt: Optional[int] = None, date_updated_lt: Optional[int] = None, ) -> List[Task]: """Search tasks across the workspace.""" workspace_id = workspace_id or self.config.default_workspace_id if not workspace_id: workspaces = await self.get_workspaces() if not workspaces: raise ClickUpAPIError("No workspaces found") workspace_id = workspaces[0].id params: Dict[str, Any] = {} if query: params["query"] = query if statuses: params["statuses[]"] = statuses if assignees: params["assignees[]"] = assignees if tags: params["tags[]"] = tags if date_created_gt: params["date_created_gt"] = str(date_created_gt) if date_created_lt: params["date_created_lt"] = str(date_created_lt) if date_updated_gt: params["date_updated_gt"] = str(date_updated_gt) if date_updated_lt: params["date_updated_lt"] = str(date_updated_lt) data = await self._request( "GET", f"/team/{workspace_id}/task", params=params, ) tasks = data.get("tasks", []) return [Task(**task) for task in tasks] async def get_task_comments(self, task_id: str) -> List[Dict[str, Any]]: """Get comments for a task.""" data = await self._request("GET", f"/task/{task_id}/comment") return data.get("comments", []) async def create_task_comment( self, task_id: str, comment_text: str, assignee: Optional[int] = None, notify_all: bool = True, ) -> Dict[str, Any]: """Create a comment on a task.""" payload: Dict[str, Any] = { "comment_text": comment_text, "notify_all": notify_all, } if assignee: payload["assignee"] = assignee data = await self._request( "POST", f"/task/{task_id}/comment", json=payload, ) return data async def get_subtasks(self, parent_task_id: str) -> List[Task]: """Get subtasks of a parent task using team endpoint.""" # Get the workspace ID - use configured default or fetch from API workspace_id = self.config.default_workspace_id if not workspace_id: workspaces = await self.get_workspaces() if not workspaces: return [] workspace_id = workspaces[0].id # Use team endpoint to get tasks with parent filter params = { "parent": parent_task_id, "include_closed": "true", } try: data = await self._request("GET", f"/team/{workspace_id}/task", params=params) tasks_data = data.get("tasks", []) return [Task(**task_data) for task_data in tasks_data] except ClickUpAPIError as e: # If team endpoint fails, fallback to original method logger.warning(f"Team endpoint failed for subtasks: {e}") return []

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DiversioTeam/clickup-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server