We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/tallpizza/dooray-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Tasks tool for managing Dooray tasks."""
import json
import logging
import os
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
class TasksTool:
"""Tool for managing Dooray tasks."""
def __init__(self, dooray_client):
"""Initialize with Dooray client."""
self.client = dooray_client
async def handle(self, arguments: Dict[str, Any]) -> str:
"""Handle tasks tool requests.
Args:
arguments: Tool arguments containing action and parameters
Returns:
JSON string with results
"""
action = arguments.get("action")
if not action:
return json.dumps({"error": "Action parameter is required"})
try:
if action == "list":
return await self._list_tasks(arguments)
elif action == "get":
return await self._get_task(arguments)
elif action == "create":
return await self._create_task(arguments)
elif action == "update":
return await self._update_task(arguments)
elif action == "delete":
return await self._delete_task(arguments)
elif action == "change_status":
return await self._change_status(arguments)
elif action == "assign":
return await self._assign_task(arguments)
else:
return json.dumps({"error": f"Unknown action: {action}"})
except Exception as e:
logger.error(f"Error in tasks tool: {e}")
return json.dumps({"error": str(e)})
def _resolve_project_id(self, arguments: Dict[str, Any]) -> Optional[str]:
"""Resolve project ID from arguments or defaults."""
project_id = arguments.get("projectId") or self.client.project_id or os.getenv("DOORAY_DEFAULT_PROJECT_ID")
if project_id:
return str(project_id)
return None
def _ensure_task_id(self, arguments: Dict[str, Any]) -> Optional[str]:
task_id = arguments.get("taskId")
if task_id is None:
return None
return str(task_id)
def _ensure_assignee_id(self, arguments: Dict[str, Any]) -> Optional[str]:
assignee_id = arguments.get("assigneeId")
if assignee_id is None:
return None
return str(assignee_id)
async def _list_tasks(self, arguments: Dict[str, Any]) -> str:
"""List tasks in a project."""
project_id = self._resolve_project_id(arguments)
if not project_id:
return json.dumps({"error": "projectId is required for list action (set DOORAY_DEFAULT_PROJECT_ID or provide projectId)"})
# Build query parameters
params = {}
if arguments.get("status"):
params["workflowClass"] = arguments["status"]
assignee_id = self._ensure_assignee_id(arguments)
if assignee_id:
params["assigneeId"] = assignee_id
result = await self.client.list_tasks(project_id, **params)
return json.dumps(result, ensure_ascii=False)
async def _get_task(self, arguments: Dict[str, Any]) -> str:
"""Get a specific task."""
project_id = self._resolve_project_id(arguments)
task_id = self._ensure_task_id(arguments)
if not project_id or not task_id:
return json.dumps({"error": "projectId and taskId are required for get action"})
result = await self.client.get_task(project_id, task_id)
return json.dumps(result, ensure_ascii=False)
async def _create_task(self, arguments: Dict[str, Any]) -> str:
"""Create a new task."""
project_id = self._resolve_project_id(arguments)
title = arguments.get("title")
if not project_id or not title:
return json.dumps({"error": "projectId and title are required for create action"})
# Build task data
task_data = {
"subject": title,
"body": {
"mimeType": "text/x-markdown",
"content": arguments.get("description", "")
}
}
assignee_id = self._ensure_assignee_id(arguments)
if assignee_id:
task_data["users"] = [{"member": {"id": assignee_id}}]
if arguments.get("priority"):
task_data["priority"] = arguments["priority"]
if arguments.get("status"):
task_data["workflowClass"] = arguments["status"]
result = await self.client.create_task(project_id, task_data)
return json.dumps(result, ensure_ascii=False)
async def _update_task(self, arguments: Dict[str, Any]) -> str:
"""Update an existing task."""
project_id = self._resolve_project_id(arguments)
task_id = self._ensure_task_id(arguments)
if not project_id or not task_id:
return json.dumps({"error": "projectId and taskId are required for update action"})
# Build update data
task_data = {}
if arguments.get("title"):
task_data["subject"] = arguments["title"]
if arguments.get("description"):
task_data["body"] = {
"mimeType": "text/x-markdown",
"content": arguments["description"]
}
assignee_id = self._ensure_assignee_id(arguments)
if assignee_id:
task_data["users"] = [{"member": {"id": assignee_id}}]
if arguments.get("priority"):
task_data["priority"] = arguments["priority"]
if arguments.get("status"):
task_data["workflowClass"] = arguments["status"]
result = await self.client.update_task(project_id, task_id, task_data)
return json.dumps(result, ensure_ascii=False)
async def _delete_task(self, arguments: Dict[str, Any]) -> str:
"""Delete a task."""
project_id = self._resolve_project_id(arguments)
task_id = self._ensure_task_id(arguments)
if not project_id or not task_id:
return json.dumps({"error": "projectId and taskId are required for delete action"})
result = await self.client.delete_task(project_id, task_id)
return json.dumps({"success": True, "message": "Task deleted successfully"})
async def _change_status(self, arguments: Dict[str, Any]) -> str:
"""Change task status."""
project_id = self._resolve_project_id(arguments)
task_id = self._ensure_task_id(arguments)
status = arguments.get("status")
workflow_id_arg = arguments.get("workflowId")
if not project_id or not task_id or (status is None and workflow_id_arg is None):
return json.dumps({"error": "projectId, taskId, and either status or workflowId are required for change_status action"})
workflow_id = str(workflow_id_arg) if workflow_id_arg is not None else None
if not workflow_id and status is not None:
workflow_id = await self._resolve_workflow_id(project_id, status)
if not workflow_id:
return json.dumps({"error": "Unable to resolve workflow. Provide workflowId or use status/workflow name."}, ensure_ascii=False)
result = await self.client.set_task_workflow(project_id, task_id, workflow_id)
return json.dumps(result, ensure_ascii=False)
async def _assign_task(self, arguments: Dict[str, Any]) -> str:
"""Assign task to a member."""
project_id = self._resolve_project_id(arguments)
task_id = self._ensure_task_id(arguments)
assignee_id = self._ensure_assignee_id(arguments)
if not task_id or not assignee_id:
return json.dumps({"error": "taskId and assigneeId are required for assign action"})
task_data = {"users": [{"member": {"id": assignee_id}}]}
result = await self.client.update_task(project_id, task_id, task_data)
return json.dumps(result, ensure_ascii=False)
async def _resolve_workflow_id(self, project_id: str, identifier: str) -> Optional[str]:
if identifier is None:
return None
identifier_text = str(identifier).strip()
if not identifier_text:
return None
workflows = await self.client.list_workflows(project_id)
items = workflows.get("result") or []
normalized = identifier_text.lower()
class_matches: list[tuple[int, str]] = []
for item in items:
workflow_id = str(item.get("id")) if item.get("id") is not None else None
if not workflow_id:
continue
if workflow_id == identifier_text:
return workflow_id
name = item.get("name")
if isinstance(name, str) and name.lower() == normalized:
return workflow_id
for localized in item.get("names", []) or []:
localized_name = localized.get("name")
if isinstance(localized_name, str) and localized_name.lower() == normalized:
return workflow_id
workflow_class = item.get("class")
if isinstance(workflow_class, str) and workflow_class.lower() == normalized:
order_value = item.get("order")
try:
order = int(order_value)
except (TypeError, ValueError):
order = 0
class_matches.append((order, workflow_id))
if class_matches:
class_matches.sort(key=lambda pair: pair[0])
return class_matches[0][1]
return None