Skip to main content
Glama
tasks.py9.77 kB
"""Tasks tool for managing Dooray tasks.""" import json import logging import os from typing import Any, Dict, Optional logger = logging.getLogger(__name__) class TasksTool: """Tool for managing Dooray tasks.""" def __init__(self, dooray_client): """Initialize with Dooray client.""" self.client = dooray_client async def handle(self, arguments: Dict[str, Any]) -> str: """Handle tasks tool requests. Args: arguments: Tool arguments containing action and parameters Returns: JSON string with results """ action = arguments.get("action") if not action: return json.dumps({"error": "Action parameter is required"}) try: if action == "list": return await self._list_tasks(arguments) elif action == "get": return await self._get_task(arguments) elif action == "create": return await self._create_task(arguments) elif action == "update": return await self._update_task(arguments) elif action == "delete": return await self._delete_task(arguments) elif action == "change_status": return await self._change_status(arguments) elif action == "assign": return await self._assign_task(arguments) else: return json.dumps({"error": f"Unknown action: {action}"}) except Exception as e: logger.error(f"Error in tasks tool: {e}") return json.dumps({"error": str(e)}) def _resolve_project_id(self, arguments: Dict[str, Any]) -> Optional[str]: """Resolve project ID from arguments or defaults.""" project_id = arguments.get("projectId") or self.client.project_id or os.getenv("DOORAY_DEFAULT_PROJECT_ID") if project_id: return str(project_id) return None def _ensure_task_id(self, arguments: Dict[str, Any]) -> Optional[str]: task_id = arguments.get("taskId") if task_id is None: return None return str(task_id) def _ensure_assignee_id(self, arguments: Dict[str, Any]) -> Optional[str]: assignee_id = arguments.get("assigneeId") if assignee_id is None: return None return str(assignee_id) async def _list_tasks(self, arguments: Dict[str, Any]) -> str: """List tasks in a project.""" project_id = self._resolve_project_id(arguments) if not project_id: return json.dumps({"error": "projectId is required for list action (set DOORAY_DEFAULT_PROJECT_ID or provide projectId)"}) # Build query parameters params = {} if arguments.get("status"): params["workflowClass"] = arguments["status"] assignee_id = self._ensure_assignee_id(arguments) if assignee_id: params["assigneeId"] = assignee_id result = await self.client.list_tasks(project_id, **params) return json.dumps(result, ensure_ascii=False) async def _get_task(self, arguments: Dict[str, Any]) -> str: """Get a specific task.""" project_id = self._resolve_project_id(arguments) task_id = self._ensure_task_id(arguments) if not project_id or not task_id: return json.dumps({"error": "projectId and taskId are required for get action"}) result = await self.client.get_task(project_id, task_id) return json.dumps(result, ensure_ascii=False) async def _create_task(self, arguments: Dict[str, Any]) -> str: """Create a new task.""" project_id = self._resolve_project_id(arguments) title = arguments.get("title") if not project_id or not title: return json.dumps({"error": "projectId and title are required for create action"}) # Build task data task_data = { "subject": title, "body": { "mimeType": "text/x-markdown", "content": arguments.get("description", "") } } assignee_id = self._ensure_assignee_id(arguments) if assignee_id: task_data["users"] = [{"member": {"id": assignee_id}}] if arguments.get("priority"): task_data["priority"] = arguments["priority"] if arguments.get("status"): task_data["workflowClass"] = arguments["status"] result = await self.client.create_task(project_id, task_data) return json.dumps(result, ensure_ascii=False) async def _update_task(self, arguments: Dict[str, Any]) -> str: """Update an existing task.""" project_id = self._resolve_project_id(arguments) task_id = self._ensure_task_id(arguments) if not project_id or not task_id: return json.dumps({"error": "projectId and taskId are required for update action"}) # Build update data task_data = {} if arguments.get("title"): task_data["subject"] = arguments["title"] if arguments.get("description"): task_data["body"] = { "mimeType": "text/x-markdown", "content": arguments["description"] } assignee_id = self._ensure_assignee_id(arguments) if assignee_id: task_data["users"] = [{"member": {"id": assignee_id}}] if arguments.get("priority"): task_data["priority"] = arguments["priority"] if arguments.get("status"): task_data["workflowClass"] = arguments["status"] result = await self.client.update_task(project_id, task_id, task_data) return json.dumps(result, ensure_ascii=False) async def _delete_task(self, arguments: Dict[str, Any]) -> str: """Delete a task.""" project_id = self._resolve_project_id(arguments) task_id = self._ensure_task_id(arguments) if not project_id or not task_id: return json.dumps({"error": "projectId and taskId are required for delete action"}) result = await self.client.delete_task(project_id, task_id) return json.dumps({"success": True, "message": "Task deleted successfully"}) async def _change_status(self, arguments: Dict[str, Any]) -> str: """Change task status.""" project_id = self._resolve_project_id(arguments) task_id = self._ensure_task_id(arguments) status = arguments.get("status") workflow_id_arg = arguments.get("workflowId") if not project_id or not task_id or (status is None and workflow_id_arg is None): return json.dumps({"error": "projectId, taskId, and either status or workflowId are required for change_status action"}) workflow_id = str(workflow_id_arg) if workflow_id_arg is not None else None if not workflow_id and status is not None: workflow_id = await self._resolve_workflow_id(project_id, status) if not workflow_id: return json.dumps({"error": "Unable to resolve workflow. Provide workflowId or use status/workflow name."}, ensure_ascii=False) result = await self.client.set_task_workflow(project_id, task_id, workflow_id) return json.dumps(result, ensure_ascii=False) async def _assign_task(self, arguments: Dict[str, Any]) -> str: """Assign task to a member.""" project_id = self._resolve_project_id(arguments) task_id = self._ensure_task_id(arguments) assignee_id = self._ensure_assignee_id(arguments) if not task_id or not assignee_id: return json.dumps({"error": "taskId and assigneeId are required for assign action"}) task_data = {"users": [{"member": {"id": assignee_id}}]} result = await self.client.update_task(project_id, task_id, task_data) return json.dumps(result, ensure_ascii=False) async def _resolve_workflow_id(self, project_id: str, identifier: str) -> Optional[str]: if identifier is None: return None identifier_text = str(identifier).strip() if not identifier_text: return None workflows = await self.client.list_workflows(project_id) items = workflows.get("result") or [] normalized = identifier_text.lower() class_matches: list[tuple[int, str]] = [] for item in items: workflow_id = str(item.get("id")) if item.get("id") is not None else None if not workflow_id: continue if workflow_id == identifier_text: return workflow_id name = item.get("name") if isinstance(name, str) and name.lower() == normalized: return workflow_id for localized in item.get("names", []) or []: localized_name = localized.get("name") if isinstance(localized_name, str) and localized_name.lower() == normalized: return workflow_id workflow_class = item.get("class") if isinstance(workflow_class, str) and workflow_class.lower() == normalized: order_value = item.get("order") try: order = int(order_value) except (TypeError, ValueError): order = 0 class_matches.append((order, workflow_id)) if class_matches: class_matches.sort(key=lambda pair: pair[0]) return class_matches[0][1] return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tallpizza/dooray-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server