Skip to main content
Glama
goodday_openwebui_complete_tool.py102 kB
""" title: Goodday Project Management author: Roney Dsilva author_url: https://github.com/cdmx1/goodday-mcp funding_url: https://github.com/cdmx1/goodday-mcp version: 1.1.0 required_open_webui_version: 0.5.3 """ import os import re import httpx import time from datetime import datetime, timezone, timedelta from typing import Callable from fastapi import Request from pydantic import BaseModel, Field class Tools: class Valves(BaseModel): api_key: str = Field("", description="Your Goodday API key") api_base: str = Field( "https://api.goodday.work/2.0", description="Goodday API base URL" ) search_url: str = Field( "https://example.com/webhook/goodday-mcp/search-tasks", description="Full VectorDB Search API endpoint URL", ) bearer_token: str = Field( "", description="Bearer token for search API authentication" ) def __init__(self): self.valves = self.Valves() self.user_agent = "goodday-openwebui-complete/1.1.0" self._user_cache = None # Cache for user data self._cache_expiry = 0 # Cache expiry timestamp async def _make_goodday_request( self, endpoint: str, method: str = "GET", data: dict = None, subfolders: bool = True ) -> dict: """Make a direct request to the Goodday API.""" api_token = self.valves.api_key or os.getenv("GOODDAY_API_TOKEN", "") if not api_token: raise ValueError( "GOODDAY_API_TOKEN environment variable or Valves.api_key is required" ) headers = { "User-Agent": self.user_agent, "gd-api-token": api_token, "Content-Type": "application/json", } # Automatically add subfolders=true for project task and document endpoints if not already present if subfolders and endpoint.startswith("project/") and ("/tasks" in endpoint or "/documents" in endpoint): if "?" in endpoint: if "subfolders=" not in endpoint: endpoint += "&subfolders=true" else: endpoint += "?subfolders=true" url = f"{self.valves.api_base}/{endpoint.lstrip('/')}" async with httpx.AsyncClient() as client: try: if method.upper() == "POST": response = await client.post( url, headers=headers, json=data, timeout=30.0 ) elif method.upper() == "PUT": response = await client.put( url, headers=headers, json=data, timeout=30.0 ) elif method.upper() == "DELETE": response = await client.delete(url, headers=headers, timeout=30.0) else: response = await client.get(url, headers=headers, timeout=30.0) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: raise Exception( f"HTTP error {e.response.status_code}: {e.response.text}" ) except httpx.RequestError as e: raise Exception(f"Request error: {str(e)}") except Exception as e: raise Exception(f"Unexpected error: {str(e)}") async def _make_search_request( self, method: str = "GET", params: dict = None ) -> dict: """Make a request to the search API with bearer token authentication.""" bearer_token = self.valves.bearer_token or os.getenv( "GOODDAY_SEARCH_BEARER_TOKEN", "" ) if not bearer_token: raise ValueError( "Bearer token is required for search API. Set it in Valves.bearer_token or GOODDAY_SEARCH_BEARER_TOKEN environment variable" ) headers = { "User-Agent": self.user_agent, "Authorization": f"Bearer {bearer_token}", "Content-Type": "application/json", } # Use the full URL directly from search_url url = str(self.valves.search_url).strip() async with httpx.AsyncClient() as client: try: if method.upper() == "GET": response = await client.get( url, headers=headers, params=params, timeout=30.0 ) else: response = await client.request( method.upper(), url, headers=headers, params=params, timeout=30.0, ) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: raise Exception( f"Search API HTTP error {e.response.status_code}: {e.response.text}" ) except httpx.RequestError as e: raise Exception(f"Search API request error: {str(e)}") except Exception as e: raise Exception(f"Search API unexpected error: {str(e)}") def _format_task(self, task: dict) -> str: """Format a task into a readable string with safe checks.""" if not isinstance(task, dict): return f"Invalid task data: {repr(task)}" # Defensive defaults in case nested keys are not dicts status = task.get("status") if isinstance(task.get("status"), dict) else {} project = task.get("project") if isinstance(task.get("project"), dict) else {} return f""" **Task ID:** {task.get('shortId', 'N/A')} **Title:** {task.get('name', 'N/A')} **Status:** {status.get('name', 'N/A')} **Project:** {project.get('name', 'N/A')} **Assigned To:** {task.get('assignedToUserId', 'N/A')} **Priority:** {task.get('priority', 'N/A')} **Start Date:** {task.get('startDate', 'N/A')} **End Date:** {task.get('endDate', 'N/A')} **Description:** {task.get('message', 'No description')} """.strip() def _format_project(self, project: dict) -> str: """Format a project into a readable string with safe checks.""" if not isinstance(project, dict): return f"Invalid project data: {repr(project)}" # Defensive defaults in case nested keys are not dicts status = ( project.get("status") if isinstance(project.get("status"), dict) else {} ) owner = project.get("owner") if isinstance(project.get("owner"), dict) else {} return f""" **Project ID:** {project.get('id', 'N/A')} **Name:** {project.get('name', 'N/A')} **Health:** {project.get('health', 'N/A')} **Status:** {status.get('name', 'N/A')} **Start Date:** {project.get('startDate', 'N/A')} **End Date:** {project.get('endDate', 'N/A')} **Progress:** {project.get('progress', 0)}% **Owner:** {owner.get('name', 'N/A')} """.strip() def _format_user(self, user: dict) -> str: """Format a user into a readable string with safe checks.""" if not isinstance(user, dict): return f"Invalid user data: {repr(user)}" # Defensive defaults in case nested keys are not dicts role = user.get("role") if isinstance(user.get("role"), dict) else {} return f""" **User ID:** {user.get('id', 'N/A')} **Name:** {user.get('name', 'N/A')} **Email:** {user.get('email', 'N/A')} **Role:** {role.get('name', 'N/A')} **Status:** {user.get('status', 'N/A')} """.strip() def _format_search_result(self, result: dict) -> str: """Format a search result into a readable string with safe checks.""" if not isinstance(result, dict): return f"Invalid search result data: {repr(result)}" # Safely get values and handle None cases task_id = result.get("taskId") or "N/A" title = result.get("title") or "N/A" content = result.get("content") or "No content" return f""" **Task ID:** {task_id} **Title:** {title} **Content:** {content} """.strip() async def _get_user_mapping(self, __event_emitter__: Callable = None) -> dict: """Get user ID to name mapping for displaying user names instead of IDs. Uses caching.""" import time # Check if cache is still valid (5 minutes TTL) current_time = time.time() if self._user_cache and (current_time - self._cache_expiry) < 300: if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Using cached user data ({len(self._user_cache)} users)", "done": False, }, } ) return self._user_cache if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": "Fetching fresh user data for name mapping...", "done": False, }, } ) users_data = await self._make_goodday_request("users") user_id_to_name = {} if isinstance(users_data, list): for u in users_data: if isinstance(u, dict): user_id_to_name[u.get("id")] = u.get("name", "Unknown") if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Loaded and cached {len(user_id_to_name)} users for name mapping", "done": False, }, } ) # Update cache self._user_cache = user_id_to_name self._cache_expiry = current_time return user_id_to_name def _create_user_display_function(self, user_id_to_name: dict): """Create a user display function with the given user mapping.""" def user_display(user_id): if not user_id: return "Unassigned" name = user_id_to_name.get(user_id) return name if name else f"User {user_id}" return user_display def _format_timestamp_ist(self, timestamp_str: str) -> str: """Format ISO timestamp to IST 12-hour format.""" if not timestamp_str or timestamp_str == 'N/A': return 'N/A' try: # Parse ISO timestamp (assuming UTC) dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) # Convert to IST (UTC+5:30) ist_tz = timezone(timedelta(hours=5, minutes=30)) dt_ist = dt.astimezone(ist_tz) # Format as 12-hour with AM/PM return dt_ist.strftime('%d %b %Y, %I:%M %p') except Exception: return timestamp_str # Return original if parsing fails async def _get_project_mapping(self, __event_emitter__: Callable = None) -> dict: """Get project ID to name mapping for displaying project names instead of IDs.""" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": "Fetching project data for name mapping...", "done": False, }, } ) projects_data = await self._make_goodday_request("projects?archived=true") project_id_to_name = {} if isinstance(projects_data, list): for p in projects_data: if isinstance(p, dict): project_id_to_name[p.get("id")] = p.get("name", "Unknown") if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Loaded {len(project_id_to_name)} projects for name mapping", "done": False, }, } ) return project_id_to_name async def _find_project_by_name( self, project_name: str, __event_emitter__: Callable = None ) -> tuple: """Find a project by name (case-insensitive). Returns (project_dict, available_projects_list).""" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Searching for project '{project_name}'...", "done": False, }, } ) projects_data = await self._make_goodday_request("projects") if not projects_data or not isinstance(projects_data, list): return None, [] # Filter out system projects (like sprints) to avoid overwhelming the AI filtered_projects = [ proj for proj in projects_data if isinstance(proj, dict) and proj.get("systemType") != "PROJECT" ] project_name_lower = project_name.lower().strip() matched_project = None available_projects = [] for proj in filtered_projects: if not isinstance(proj, dict): continue available_projects.append(proj.get("name", "Unknown")) current_project_name = proj.get("name", "").lower().strip() if current_project_name == project_name_lower: matched_project = proj break if ( project_name_lower in current_project_name or current_project_name in project_name_lower ): matched_project = proj break if __event_emitter__: if matched_project: await __event_emitter__( { "type": "status", "data": { "description": f"Found project '{matched_project.get('name')}' (ID: {matched_project.get('id')})", "done": False, }, } ) else: await __event_emitter__( { "type": "status", "data": { "description": f"Project '{project_name}' not found in {len(available_projects)} available projects", "done": False, }, } ) return matched_project, available_projects async def _find_sprint_by_name( self, project_id: str, sprint_name: str, __event_emitter__: Callable = None ) -> tuple: """Find a sprint project by name within a parent project. Returns (sprint_dict, available_sprints_list).""" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Searching for sprint '{sprint_name}' in project {project_id}...", "done": False, }, } ) projects_data = await self._make_goodday_request("projects") if not projects_data or not isinstance(projects_data, list): return None, [] normalized_sprint_name = sprint_name.lower().strip() if not normalized_sprint_name.startswith("sprint"): normalized_sprint_name = f"sprint {normalized_sprint_name}" available_sprints = [] search_number = re.search(r"(\d+)", normalized_sprint_name) exact_match = None substring_match = None for proj in projects_data: if isinstance(proj, dict) and proj.get("systemType") == "PROJECT": sprint_proj_name = proj.get("name", "").lower().strip() if sprint_proj_name.startswith("sprint"): available_sprints.append(proj.get("name", "")) project_number = re.search(r"(\d+)", sprint_proj_name) # Prefer exact number match if ( search_number and project_number and search_number.group(1) == project_number.group(1) ): exact_match = proj # Fallback: search number as substring anywhere in the sprint name elif search_number and search_number.group(1) in sprint_proj_name: if not substring_match: substring_match = proj elif normalized_sprint_name == sprint_proj_name: if not exact_match: exact_match = proj elif ( normalized_sprint_name in sprint_proj_name or sprint_proj_name in normalized_sprint_name ): if not substring_match: substring_match = proj result_sprint = exact_match if exact_match else substring_match if __event_emitter__: if result_sprint: await __event_emitter__( { "type": "status", "data": { "description": f"Found sprint '{result_sprint.get('name')}' (ID: {result_sprint.get('id')})", "done": False, }, } ) else: await __event_emitter__( { "type": "status", "data": { "description": f"Sprint '{sprint_name}' not found. Available sprints: {len(available_sprints)}", "done": False, }, } ) return result_sprint, available_sprints async def _find_user_by_name_or_email( self, user_identifier: str, __event_emitter__: Callable = None ) -> tuple: """Find a user by name or email (case-insensitive). Returns (user_dict, available_users_list).""" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Searching for user '{user_identifier}'...", "done": False, }, } ) users_data = await self._make_goodday_request("users") if not users_data or not isinstance(users_data, list): return None, [] user_lower = user_identifier.lower().strip() matched_user = None available_users = [] for u in users_data: if not isinstance(u, dict): continue available_users.append(u.get("name", "Unknown")) name = u.get("name", "").lower().strip() email = u.get("email", "").lower().strip() if user_lower == name or user_lower == email: matched_user = u break if user_lower in name or user_lower in email: matched_user = u break if __event_emitter__: if matched_user: await __event_emitter__( { "type": "status", "data": { "description": f"Found user '{matched_user.get('name')}' (ID: {matched_user.get('id')})", "done": False, }, } ) else: await __event_emitter__( { "type": "status", "data": { "description": f"User '{user_identifier}' not found in {len(available_users)} available users", "done": False, }, } ) return matched_user, available_users # Project Management Tools async def get_goodday_projects( self, archived: bool = True, root_only: bool = False, __request__: Request = None, __user__: dict = None, __event_emitter__: Callable = None, ) -> str: """ Get list of projects from Goodday project management :param archived: Set to true to retrieve archived/closed projects :param root_only: Set to true to return only root projects """ archived = True # Force always true if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": "Fetching Goodday projects...", "done": False, }, } ) try: if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Building API endpoint with archived={archived}, root_only={root_only}...", "done": False, }, } ) params = [] if archived: params.append("archived=true") if root_only: params.append("rootOnly=true") endpoint = "projects" if params: endpoint += "?" + "&".join(params) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching projects from endpoint: {endpoint}...", "done": False, }, } ) data = await self._make_goodday_request(endpoint) if not data: return "No projects found." if isinstance(data, dict): if "error" in data: return f"Unable to fetch projects: {data.get('error', 'Unknown error')}" elif not isinstance(data, list): return ( f"Unexpected response format: {type(data).__name__} - {str(data)}" ) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Retrieved {len(data)} projects, filtering and formatting results...", "done": False, }, } ) # Filter out system projects (like sprints) to avoid overwhelming the AI filtered_projects = [ project for project in data if isinstance(project, dict) and project.get("systemType") != "PROJECT" ] projects = [self._format_project(project) for project in filtered_projects] result = "\n---\n".join(projects) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": "Successfully retrieved projects", "done": True, }, } ) return f"**Goodday Projects:**\n\n{result}" except Exception as e: error_msg = f"Failed to retrieve projects: {str(e)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return error_msg # Task Management Tools async def get_goodday_project_tasks( self, project_name: str, closed: bool = True, # Always True subfolders: bool = True, # Always True __request__: Request = None, __user__: dict = None, __event_emitter__: Callable = None, ) -> str: """ Get tasks from a specific Goodday project by project name (case-insensitive) :param project_name: The name of the project (required, case-insensitive) :param closed: Set to true to retrieve all open and closed tasks :param subfolders: Set to true to include tasks from subfolders """ closed = True # Force always true subfolders = True # Force always true if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Finding project '{project_name}'...", "done": False, }, } ) try: # Use helper method to find project matched_project, available_projects = await self._find_project_by_name( project_name, __event_emitter__ ) if not matched_project: return f"Project '{project_name}' not found. Available projects: {', '.join(available_projects[:10])}{'...' if len(available_projects) > 10 else ''}" project_id = matched_project.get("id") actual_project_name = matched_project.get("name") if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Building API endpoint for project '{actual_project_name}' (ID: {project_id})...", "done": False, }, } ) params = [] if closed: params.append("closed=true") endpoint = f"project/{project_id}/tasks" if params: endpoint += "?" + "&".join(params) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching tasks from endpoint: {endpoint}...", "done": False, }, } ) data = await self._make_goodday_request(endpoint) if not data: return f"No tasks found in project '{actual_project_name}'." if isinstance(data, dict) and "error" in data: return f"Unable to fetch tasks: {data.get('error', 'Unknown error')}" if not isinstance(data, list): return f"Unexpected response format: {str(data)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Found {len(data)} tasks in project '{actual_project_name}'", "done": False, }, } ) # Get user mapping using helper method user_id_to_name = await self._get_user_mapping(__event_emitter__) user_display = self._create_user_display_function(user_id_to_name) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Formatting {len(data)} tasks with user names...", "done": False, }, } ) tasks = [ self._format_task(task).replace( f"**Assigned To:** {task.get('assignedToUserId', 'N/A')}", f"**Assigned To:** {user_display(task.get('assignedToUserId'))}", ) for task in data ] result = "\n---\n".join(tasks) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Successfully retrieved project tasks for '{actual_project_name}'", "done": True, }, } ) return f"**Project '{actual_project_name}' Tasks:**\n\n{result}" except Exception as e: error_msg = f"Failed to retrieve project tasks: {str(e)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return error_msg async def get_goodday_sprint_tasks( self, project_name: str, sprint_name: str, closed: bool = True, # Always True __request__: Request = None, __user__: dict = None, __event_emitter__: Callable = None, ) -> str: """ Get tasks from a specific sprint by project name and sprint name :param project_name: The name of the project (e.g., "ASTRA", "Astra") :param sprint_name: The name of the sprint (e.g., "Sprint 233", "233") :param closed: Set to true to retrieve all open and closed tasks """ closed = True # Force always true if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Finding project '{project_name}' and sprint '{sprint_name}'...", "done": False, }, } ) try: # Use helper method to find the main project main_project, available_projects = await self._find_project_by_name( project_name, __event_emitter__ ) if not main_project: return f"Project '{project_name}' not found. Available projects: {', '.join(available_projects[:10])}{'...' if len(available_projects) > 10 else ''}" project_id = main_project.get("id") actual_project_name = main_project.get("name") if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Found main project '{actual_project_name}', searching for sprint '{sprint_name}'...", "done": False, }, } ) # Use helper method to find the sprint sprint_project, available_sprints = await self._find_sprint_by_name( project_id, sprint_name, __event_emitter__ ) if not sprint_project: if available_sprints: return f"Sprint '{sprint_name}' not found in project {project_id}. Available sprints: {', '.join(available_sprints)}" else: return f"No sprints found in project {project_id}. Make sure the project ID is correct and contains sprint sub-projects." sprint_id = sprint_project.get("id") actual_sprint_name = sprint_project.get("name") if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Building API endpoint for sprint '{actual_sprint_name}'...", "done": False, }, } ) params = [] if closed: params.append("closed=true") endpoint = f"project/{sprint_id}/tasks" if params: endpoint += "?" + "&".join(params) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching tasks from endpoint: {endpoint}...", "done": False, }, } ) data = await self._make_goodday_request(endpoint) if not data: return f"No tasks found in sprint '{actual_sprint_name}'." if isinstance(data, dict) and "error" in data: return f"Unable to fetch sprint tasks: {data.get('error', 'Unknown error')}" if not isinstance(data, list): return f"Unexpected response format: {str(data)}" if len(data) == 0: return f"Sprint '{actual_sprint_name}' exists but contains no tasks." if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Found {len(data)} tasks in sprint '{actual_sprint_name}'", "done": False, }, } ) # Get user mapping using helper method user_id_to_name = await self._get_user_mapping(__event_emitter__) user_display = self._create_user_display_function(user_id_to_name) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Formatting {len(data)} tasks with user names...", "done": False, }, } ) tasks = [ self._format_task(task).replace( f"**Assigned To:** {task.get('assignedToUserId', 'N/A')}", f"**Assigned To:** {user_display(task.get('assignedToUserId'))}", ) for task in data ] result = "\n---\n".join(tasks) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Successfully retrieved {len(data)} tasks from sprint '{actual_sprint_name}'", "done": True, }, } ) return f"**Sprint '{actual_sprint_name}' Tasks ({len(data)} tasks):**\n\n{result}" except Exception as e: error_msg = f"Failed to retrieve sprint tasks: {str(e)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return error_msg async def get_goodday_smart_query( self, query: str, __request__: Request = None, __user__: dict = None, __event_emitter__: Callable = None, ) -> str: """ Smart query function that interprets natural language requests for Goodday data :param query: Natural language query (e.g., "get tasks from sprint 233", "tasks assigned to Roney Dsilva", "Sprint 102 tasks from ASTRA project") """ if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Processing query: '{query}'...", "done": False, }, } ) try: query_lower = query.lower().strip() # Pattern: "get tasks in sprint X in PROJECT" or "sprint X tasks from PROJECT" or "sprint summary" if "sprint" in query_lower and ( "tasks" in query_lower or "task" in query_lower or "summary" in query_lower ): # Extract sprint number/name and project name # More flexible regex patterns sprint_patterns = [ r"sprint\s+(\w+)", r"spring\s+(\w+)", # Handle typos like "spring" instead of "sprint" ] project_patterns = [ r"(?:from|in|project)\s+(\w+)", r"(\w+)\s+project", ] sprint_name = None project_name = None # Try to find sprint name for pattern in sprint_patterns: sprint_match = re.search(pattern, query_lower) if sprint_match: sprint_name = sprint_match.group(1) break # Try to find project name for pattern in project_patterns: project_match = re.search(pattern, query_lower) if project_match: project_name = project_match.group(1) break if sprint_name: if project_name: # Check if it's a summary request if "summary" in query_lower: return await self.get_goodday_sprint_summary( project_name=project_name, sprint_name=sprint_name, __request__=__request__, __user__=__user__, __event_emitter__=__event_emitter__, ) else: return await self.get_goodday_sprint_tasks( project_name=project_name, sprint_name=sprint_name, __request__=__request__, __user__=__user__, __event_emitter__=__event_emitter__, ) return f"Please specify the project name. Example: 'get tasks from sprint {sprint_name} in ASTRA project'" # Pattern: "tasks assigned to USER" or "USER tasks" if ( "assigned to" in query_lower or "tasks for" in query_lower ) and "task" in query_lower: # Extract user name or email if "assigned to" in query_lower: user_part = query_lower.split("assigned to")[1].strip() elif "tasks for" in query_lower: user_part = query_lower.split("tasks for")[1].strip() else: user_part = "" user_value = user_part.replace("user", "").replace("tasks", "").strip() if user_value: return await self.get_goodday_user_tasks( user=user_value, __request__=__request__, __user__=__user__, __event_emitter__=__event_emitter__, ) # Pattern: "get messages from TASK-ID" or "messages for TASK-ID" if ("message" in query_lower or "messages" in query_lower) and ( "from" in query_lower or "for" in query_lower ): # Extract task ID and project name task_id_pattern = r"(?:from|for)\s+([A-Z]+-\d+)" task_match = re.search(task_id_pattern, query_lower) # Try to extract project name project_patterns = [ r"(?:from|in|project)\s+(\w+)", r"(\w+)\s+project", ] project_name = None for pattern in project_patterns: project_match = re.search(pattern, query_lower) if project_match: project_name = project_match.group(1) break if task_match: task_short_id = task_match.group(1).upper() if project_name: return await self.get_goodday_task_messages( task_short_id=task_short_id, project_name=project_name, __request__=__request__, __user__=__user__, __event_emitter__=__event_emitter__, ) else: return f"Please specify the project name. Example: 'get messages from {task_short_id} in ASTRA project'" # Pattern: "get task TASK-ID" or "task details for TASK-ID" or "details for TASK-ID" if ( ("task" in query_lower and "detail" in query_lower) or ( "details" in query_lower and ("for" in query_lower or "of" in query_lower) ) or ("get task" in query_lower) ) and not ("tasks" in query_lower or "message" in query_lower): # Extract task ID and project name task_id_patterns = [ r"(?:task|for|of)\s+([A-Z]+-\d+)", r"([A-Z]+-\d+)", # Just the task ID pattern ] # Try to extract project name project_patterns = [ r"(?:from|in|project)\s+(\w+)", r"(\w+)\s+project", ] project_name = None for pattern in project_patterns: project_match = re.search(pattern, query_lower) if project_match: project_name = project_match.group(1) break task_short_id = None for pattern in task_id_patterns: task_match = re.search(pattern, query_lower) if task_match: task_short_id = task_match.group(1).upper() break if task_short_id: if project_name: return await self.get_goodday_task_details( task_short_id=task_short_id, project_name=project_name, __request__=__request__, __user__=__user__, __event_emitter__=__event_emitter__, ) else: return f"Please specify the project name. Example: 'get task {task_short_id} in ASTRA project'" # Pattern: "USER tasks" (e.g., "Roney Dsilva tasks") if "task" in query_lower and not any( keyword in query_lower for keyword in [ "sprint", "spring", "project", "assigned", "message", "detail", ] ): user_value = ( query_lower.replace("tasks", "") .replace("task", "") .replace("get", "") .strip() ) if user_value: return await self.get_goodday_user_tasks( user=user_value, __request__=__request__, __user__=__user__, __event_emitter__=__event_emitter__, ) # Pattern: "search for QUERY" or "find tasks QUERY" if ( "search" in query_lower or "find" in query_lower ) and "task" in query_lower: # Extract search query search_terms = query_lower # Remove common words for word in ["search", "for", "find", "tasks", "task", "get"]: search_terms = search_terms.replace(word, "").strip() if search_terms: return await self.search_goodday_tasks( query=search_terms, __request__=__request__, __user__=__user__, __event_emitter__=__event_emitter__, ) # If no pattern matches, provide suggestions return """I couldn't understand your query. Here are some examples of what I can help with: **Search Tasks:** - "search for Security tasks" - "find tasks S3 upload" - "search tasks security improvements" **Sprint Tasks:** - "get tasks from sprint 233 in ASTRA project" - "get tasks in sprint 122 in Astra project" - "sprint 102 tasks from PROJECT_NAME" - "tasks from sprint 233" **Sprint Summary:** - "get sprint 233 summary from ASTRA project" - "sprint 102 summary in Astra project" - "summary of sprint 233 from ASTRA" - "sprint summary for 102 in Astra" **User Tasks:** - "tasks assigned to Roney Dsilva" - "Roney Dsilva tasks" - "tasks for John Smith" **Task Messages:** - "get messages from RAD-434 in ASTRA project" - "messages for ABC-123 in PROJECT_NAME" - "get all messages from TASK-456 in Astra" **Task Details:** - "get task RAD-434 in ASTRA project" - "task details for ABC-123 in PROJECT_NAME" - "details for TASK-456 in Astra" - "get details of RAD-434 in ASTRA" **Other Options:** - Use specific function names like `get_goodday_projects()` for projects - Use `get_goodday_users()` to see available users - Use `get_goodday_project_tasks()` for project tasks - Use `search_goodday_tasks()` for semantic search - Use `get_goodday_sprint_summary()` for detailed sprint analysis Please try rephrasing your query with one of these patterns.""" except Exception as e: error_msg = f"Failed to process query: {str(e)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return error_msg async def get_goodday_user_tasks( self, user: str, closed: bool = True, # Always True __request__: Request = None, __user__: dict = None, __event_emitter__: Callable = None, ) -> str: """ Get tasks assigned to a user by name or email (case-insensitive). :param user: User name or email (case-insensitive) :param closed: Set to true to retrieve all open and closed tasks """ closed = True # Force always true if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": f"Finding user '{user}'...", "done": False}, } ) try: # Use helper method to find user matched_user, available_users = await self._find_user_by_name_or_email( user, __event_emitter__ ) if not matched_user: return f"User '{user}' not found. Available users: {', '.join(available_users[:10])}{'...' if len(available_users) > 10 else ''}" user_id = matched_user.get("id") actual_user_name = matched_user.get("name") if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Building API endpoint for user '{actual_user_name}' (ID: {user_id})...", "done": False, }, } ) params = [] if closed: params.append("closed=true") endpoint = f"user/{user_id}/assigned-tasks" if params: endpoint += "?" + "&".join(params) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching tasks from endpoint: {endpoint}...", "done": False, }, } ) data = await self._make_goodday_request(endpoint) if not data: return f"No tasks found assigned to '{actual_user_name}'." if isinstance(data, dict) and "error" in data: return ( f"Unable to fetch user tasks: {data.get('error', 'Unknown error')}" ) if not isinstance(data, list): return f"Unexpected response format: {str(data)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Found {len(data)} tasks assigned to '{actual_user_name}'", "done": False, }, } ) # Get user mapping for consistent user display in tasks user_id_to_name = await self._get_user_mapping(__event_emitter__) user_display = self._create_user_display_function(user_id_to_name) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Formatting {len(data)} tasks with user names...", "done": False, }, } ) tasks = [ self._format_task(task).replace( f"**Assigned To:** {task.get('assignedToUserId', 'N/A')}", f"**Assigned To:** {user_display(task.get('assignedToUserId'))}", ) for task in data ] result = "\n---\n".join(tasks) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Successfully retrieved tasks for '{actual_user_name}'", "done": True, }, } ) return f"**Tasks assigned to '{actual_user_name}':**\n\n{result}" except Exception as e: error_msg = f"Failed to retrieve user tasks: {str(e)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return error_msg async def get_goodday_task_messages( self, task_short_id: str, project_name: str, __request__: Request = None, __user__: dict = None, __event_emitter__: Callable = None, ) -> str: """ Get messages from a specific task by its short ID (e.g., RAD-434) within a specified project. :param task_short_id: The short ID of the task (e.g., RAD-434) :param project_name: The name of the project containing the task (required, case-insensitive) """ if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Finding project '{project_name}'...", "done": False, }, } ) try: # Get all projects and find the one matching project_name (case-insensitive) projects_data = await self._make_goodday_request("projects") if not projects_data or not isinstance(projects_data, list): return "Unable to fetch projects to search for task." # Filter out system projects (like sprints) to avoid overwhelming the AI filtered_projects = [ proj for proj in projects_data if isinstance(proj, dict) and proj.get("systemType") != "PROJECT" ] project_name_lower = project_name.lower().strip() matched_project = None for proj in filtered_projects: if not isinstance(proj, dict): continue current_project_name = proj.get("name", "").lower().strip() if current_project_name == project_name_lower: matched_project = proj break if ( project_name_lower in current_project_name or current_project_name in project_name_lower ): matched_project = proj break if not matched_project: available_projects = [ p.get("name", "Unknown") for p in projects_data if isinstance(p, dict) ] return f"Project '{project_name}' not found. Available projects: {', '.join(available_projects[:10])}{'...' if len(available_projects) > 10 else ''}" project_id = matched_project.get("id") found_in_project = matched_project.get("name") if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Searching for task '{task_short_id}' in project '{found_in_project}'...", "done": False, }, } ) # Search for the task in the specified project endpoint = f"project/{project_id}/tasks" tasks_data = await self._make_goodday_request(endpoint) if not tasks_data or not isinstance(tasks_data, list): return f"Unable to fetch tasks for project '{found_in_project}'." task_id = None task_name = None for task in tasks_data: if isinstance(task, dict) and task.get("shortId") == task_short_id: task_id = task.get("id") task_name = task.get("name") break if not task_id: return f"Task with short ID '{task_short_id}' not found in project '{found_in_project}'. Please verify the task ID and project name are correct." if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Found task '{task_name}' in project '{found_in_project}', fetching messages...", "done": False, }, } ) # Now get the messages for this task endpoint = f"task/{task_id}/messages" messages_data = await self._make_goodday_request(endpoint) if not messages_data: return f"No messages found for task '{task_short_id}' ({task_name})." if isinstance(messages_data, dict) and "error" in messages_data: return f"Unable to fetch task messages: {messages_data.get('error', 'Unknown error')}" if not isinstance(messages_data, list): return f"Unexpected response format: {str(messages_data)}" if len(messages_data) == 0: return f"Task '{task_short_id}' ({task_name}) has no messages." # Get all users for name lookup users_data = await self._make_goodday_request("users") user_id_to_name = {} if isinstance(users_data, list): for u in users_data: if isinstance(u, dict): user_id_to_name[u.get("id")] = u.get("name", "Unknown") def user_display(user_id): if not user_id: return "N/A" name = user_id_to_name.get(user_id) return f"{name} ({user_id})" if name else user_id # Format the messages formatted_messages = [] for msg in messages_data: if not isinstance(msg, dict): continue formatted_msg = f""" **Message ID:** {msg.get('id', 'N/A')} **Date Created:** {msg.get('dateCreated', 'N/A')} **From User:** {user_display(msg.get('fromUserId'))} **To User:** {user_display(msg.get('toUserId'))} **Message:** {msg.get('message', 'No message content')} **Task Status ID:** {msg.get('taskStatusId', 'N/A')} **Time Report ID:** {msg.get('timeReportId', 'N/A')} **Edit By User:** {user_display(msg.get('editByUserId'))} **Edit Date:** {msg.get('editDate', 'N/A')} """.strip() formatted_messages.append(formatted_msg) result = "\n---\n".join(formatted_messages) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Successfully retrieved {len(messages_data)} messages for task '{task_short_id}'", "done": True, }, } ) return f"**Messages for Task '{task_short_id}' ({task_name}) in project '{found_in_project}' - {len(messages_data)} messages:**\n\n{result}" except Exception as e: error_msg = f"Failed to retrieve task messages: {str(e)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return error_msg async def get_goodday_task_details( self, task_short_id: str, project_name: str, __request__: Request = None, __user__: dict = None, __event_emitter__: Callable = None, ) -> str: """ Get detailed information about a specific task by its short ID (e.g., RAD-434) within a specified project. :param task_short_id: The short ID of the task (e.g., RAD-434) :param project_name: The name of the project containing the task (required, case-insensitive) """ if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Finding project '{project_name}'...", "done": False, }, } ) try: # Get all projects and find the one matching project_name (case-insensitive) projects_data = await self._make_goodday_request("projects") if not projects_data or not isinstance(projects_data, list): return "Unable to fetch projects to search for task." # Filter out system projects (like sprints) to avoid overwhelming the AI filtered_projects = [ proj for proj in projects_data if isinstance(proj, dict) and proj.get("systemType") != "PROJECT" ] project_name_lower = project_name.lower().strip() matched_project = None for proj in filtered_projects: if not isinstance(proj, dict): continue current_project_name = proj.get("name", "").lower().strip() if current_project_name == project_name_lower: matched_project = proj break if ( project_name_lower in current_project_name or current_project_name in project_name_lower ): matched_project = proj break if not matched_project: available_projects = [ p.get("name", "Unknown") for p in projects_data if isinstance(p, dict) ] return f"Project '{project_name}' not found. Available projects: {', '.join(available_projects[:10])}{'...' if len(available_projects) > 10 else ''}" project_id = matched_project.get("id") found_in_project = matched_project.get("name") if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Searching for task '{task_short_id}' in project '{found_in_project}'...", "done": False, }, } ) # Search for the task in the specified project endpoint = f"project/{project_id}/tasks" tasks_data = await self._make_goodday_request(endpoint) if not tasks_data or not isinstance(tasks_data, list): return f"Unable to fetch tasks for project '{found_in_project}'." task_id = None for task in tasks_data: if isinstance(task, dict) and task.get("shortId") == task_short_id: task_id = task.get("id") break if not task_id: return f"Task with short ID '{task_short_id}' not found in project '{found_in_project}'. Please verify the task ID and project name are correct." if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Found task in project '{found_in_project}', fetching detailed information...", "done": False, }, } ) # Now get the detailed task information endpoint = f"task/{task_id}" task_data = await self._make_goodday_request(endpoint) if not task_data: return f"No details found for task '{task_short_id}'." if isinstance(task_data, dict) and "error" in task_data: return f"Unable to fetch task details: {task_data.get('error', 'Unknown error')}" if not isinstance(task_data, dict): return f"Unexpected response format: {str(task_data)}" # Fetch messages for this task and use the first message as the description endpoint = f"task/{task_id}/messages" messages_data = await self._make_goodday_request(endpoint) first_message = None if ( messages_data and isinstance(messages_data, list) and len(messages_data) > 0 ): first_msg = messages_data[0] if isinstance(first_msg, dict): first_message = first_msg.get("message", "No description") else: first_message = "No description" # Get all users for name lookup users_data = await self._make_goodday_request("users") user_id_to_name = {} if isinstance(users_data, list): for u in users_data: if isinstance(u, dict): user_id_to_name[u.get("id")] = u.get("name", "Unknown") def user_display(user_id): if not user_id: return "N/A" name = user_id_to_name.get(user_id) return f"{name} ({user_id})" if name else user_id # Format the task details status = ( task_data.get("status", {}) if isinstance(task_data.get("status"), dict) else {} ) task_type = ( task_data.get("taskType", {}) if isinstance(task_data.get("taskType"), dict) else {} ) custom_fields = ( task_data.get("customFieldsData", {}) if isinstance(task_data.get("customFieldsData"), dict) else {} ) subtasks = ( task_data.get("subtasks", []) if isinstance(task_data.get("subtasks"), list) else [] ) users = ( task_data.get("users", []) if isinstance(task_data.get("users"), list) else [] ) formatted_details = f""" **Task ID:** {task_data.get('shortId', 'N/A')} **Name:** {task_data.get('name', 'N/A')} **Project:** {found_in_project} **Status:** {status.get('name', 'N/A')}) **Task Type:** {task_type.get('name', 'N/A')}) **System Status:** {task_data.get('systemStatus', 'N/A')} **System Type:** {task_data.get('systemType', 'N/A')} **Priority:** {task_data.get('priority', 'N/A')} **Assigned To:** {user_display(task_data.get('assignedToUserId'))} **Action Required:** {user_display(task_data.get('actionRequiredUserId'))} **Created By:** {user_display(task_data.get('createdByUserId'))} **Start Date:** {task_data.get('startDate', 'N/A')} **End Date:** {task_data.get('endDate', 'N/A')} **Deadline:** {task_data.get('deadline', 'N/A')} **Schedule Date:** {task_data.get('scheduleDate', 'N/A')} **Schedule Status:** {task_data.get('scheduleStatus', 'N/A')} **Estimate:** {task_data.get('estimate', 'N/A')} **Reported Time:** {task_data.get('reportedTime', 'N/A')} **Moment Created:** {task_data.get('momentCreated', 'N/A')} **Moment Closed:** {task_data.get('momentClosed', 'N/A')} **Recent Activity:** {task_data.get('recentActivityMoment', 'N/A')} **Parent Task ID:** {task_data.get('parentTaskId', 'N/A')} **Users:** {', '.join([user_display(uid) for uid in users]) if users else 'N/A'} **Subtasks Count:** {len(subtasks)} **Description:** {first_message} """.strip() # Add custom fields if they exist if custom_fields: formatted_details += "\n\n**Custom Fields:**" for field_id, field_value in custom_fields.items(): formatted_details += f"\n- {field_id}: {field_value}" # Add subtasks if they exist if subtasks: formatted_details += f"\n\n**Subtasks ({len(subtasks)}):**" for i, subtask in enumerate( subtasks[:10] ): # Limit to first 10 subtasks if isinstance(subtask, dict): formatted_details += f"\n- {subtask.get('shortId', 'N/A')}: {subtask.get('name', 'N/A')}" else: formatted_details += f"\n- Subtask {i+1}: {subtask}" if len(subtasks) > 10: formatted_details += f"\n... and {len(subtasks) - 10} more subtasks" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Successfully retrieved details for task '{task_short_id}'", "done": True, }, } ) return f"**Task Details for '{task_short_id}' in project '{found_in_project}':**\n\n{formatted_details}" except Exception as e: error_msg = f"Failed to retrieve task details: {str(e)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return error_msg # Search Tools async def search_goodday_tasks( self, query: str, __request__: Request = None, __user__: dict = None, __event_emitter__: Callable = None, ) -> str: """ Search for tasks in Goodday using semantic search :param query: Search query to find relevant tasks (e.g., "security task", "UI improvements") """ # Validate and sanitize the query parameter if not query or not isinstance(query, str): return "Error: Search query must be a non-empty string" query = query.strip() if not query: return "Error: Search query cannot be empty" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Searching for tasks with query: '{query}'...", "done": False, }, } ) try: if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Preparing search request with query: '{query}'...", "done": False, }, } ) # Make the search request params = {"query": query} data = await self._make_search_request(params=params) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": "Search API response received, processing results...", "done": False, }, } ) if not data: return f"No search results found for query: '{query}'" if isinstance(data, dict) and "error" in data: return f"Search error: {data.get('error', 'Unknown error')}" # Handle the response format - expecting a list with result objects if not isinstance(data, list) or len(data) == 0: return f"No search results found for query: '{query}'" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Parsing search response structure with {len(data)} items...", "done": False, }, } ) # Extract results from the first item in the list first_item = data[0] if not isinstance(first_item, dict) or "result" not in first_item: return f"Unexpected search response format: {str(data)}" results = first_item.get("result", []) if not isinstance(results, list) or len(results) == 0: return f"No search results found for query: '{query}'" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Found {len(results)} raw results, deduplicating by task ID...", "done": False, }, } ) # Format the search results formatted_results = [] seen_task_ids = set() for result in results: if not isinstance(result, dict): continue task_id = result.get("taskId") or "N/A" # Group by task ID to avoid duplicates if task_id not in seen_task_ids: seen_task_ids.add(task_id) formatted_results.append(self._format_search_result(result)) else: # For duplicate task IDs, append additional content if different existing_result = None for i, existing in enumerate(formatted_results): if task_id in existing: existing_result = i break if existing_result is not None: current_content = result.get("content") or "" if ( current_content and current_content not in formatted_results[existing_result] ): formatted_results[ existing_result ] += f"\n**Additional Content:** {current_content}" result_text = "\n---\n".join(formatted_results) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Found {len(formatted_results)} unique tasks matching '{query}'", "done": True, }, } ) return f"**Search Results for '{query}' ({len(formatted_results)} unique tasks found):**\n\n{result_text}" except Exception as e: error_msg = f"Failed to search tasks: {str(e)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return error_msg async def get_goodday_sprint_summary( self, project_name: str, sprint_name: str, __request__: Request = None, __user__: dict = None, __event_emitter__: Callable = None, ) -> str: """ Get a sprint summary with task details, status distribution, and key metrics. :param project_name: The name of the main project (e.g., "ASTRA", "Astra") :param sprint_name: The name or number of the sprint (e.g., "Sprint 233", "233") """ if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Finding sprint '{sprint_name}' in project '{project_name}'...", "done": False, }, } ) def _find_main_project(projects_data, project_name): """Helper function to find main project (reused from sprint_tasks)""" project_name_lower = project_name.lower().strip() for proj in projects_data: if not isinstance(proj, dict): continue current_project_name = proj.get("name", "").lower().strip() if current_project_name == project_name_lower: return proj if ( project_name_lower in current_project_name or current_project_name in project_name_lower ): return proj return None def _find_sprint_project(projects_data, _unused_parent_project_id, sprint_name): """Helper function to find sprint project (reused from sprint_tasks)""" normalized_sprint_name = sprint_name.lower().strip() if not normalized_sprint_name.startswith("sprint"): normalized_sprint_name = f"sprint {normalized_sprint_name}" available_sprints = [] search_number = re.search(r"(\d+)", normalized_sprint_name) exact_match = None substring_match = None for proj in projects_data: if isinstance(proj, dict) and proj.get("systemType") == "PROJECT": sprint_proj_name = proj.get("name", "").lower().strip() if sprint_proj_name.startswith("sprint"): available_sprints.append(proj.get("name", "")) project_number = re.search(r"(\d+)", sprint_proj_name) # Prefer exact number match if ( search_number and project_number and search_number.group(1) == project_number.group(1) ): exact_match = proj # Fallback: search number as substring anywhere in the sprint name elif ( search_number and search_number.group(1) in sprint_proj_name ): if not substring_match: substring_match = proj elif normalized_sprint_name == sprint_proj_name: if not exact_match: exact_match = proj elif ( normalized_sprint_name in sprint_proj_name or sprint_proj_name in normalized_sprint_name ): if not substring_match: substring_match = proj if exact_match: return exact_match, available_sprints if substring_match: return substring_match, available_sprints return None, available_sprints try: # Find main project projects_data = await self._make_goodday_request("projects") if not projects_data or not isinstance(projects_data, list): return "Unable to fetch projects." main_project = _find_main_project(projects_data, project_name) if not main_project: available_projects = [ p.get("name", "Unknown") for p in projects_data if isinstance(p, dict) ] return f"Project '{project_name}' not found. Available projects: {', '.join(available_projects[:10])}{'...' if len(available_projects) > 10 else ''}" main_project_id = main_project.get("id") actual_project_name = main_project.get("name") # Print all sprint projects under the main project sprint_projects = [ p for p in projects_data if isinstance(p, dict) and p.get("name", "").lower().startswith("sprint") and p.get("parentProjectId") == main_project_id ] if sprint_projects: sprint_list = "\n".join( [ f"- {p.get('name', 'Unknown')} (ID: {p.get('id', 'N/A')})" for p in sprint_projects ] ) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Sprints under project '{actual_project_name}':\n{sprint_list}", "done": False, }, } ) else: if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"No sprints found under project '{actual_project_name}'.", "done": False, }, } ) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Found project '{actual_project_name}', finding sprint...", "done": False, }, } ) # Get all projects again to find the sprint projects_data = await self._make_goodday_request("projects") if not projects_data or not isinstance(projects_data, list): return "Unable to fetch projects to find sprint." # Find sprint project sprint_project, available_sprints = _find_sprint_project( projects_data, main_project_id, sprint_name ) if not sprint_project: if available_sprints: return f"Sprint '{sprint_name}' not found in project {main_project_id}. Available sprints: {', '.join(available_sprints)}" else: return f"No sprints found in project {main_project_id}. Make sure the project ID is correct and contains sprint sub-projects." sprint_id = sprint_project.get("id") actual_sprint_name = sprint_project.get("name") if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Found sprint '{actual_sprint_name}', analyzing tasks...", "done": False, }, } ) # Get all tasks in the sprint with closed tasks included - using same logic as get_goodday_sprint_tasks params = [] params.append("closed=true") # Always include closed tasks for summary endpoint = f"project/{sprint_id}/tasks" if params: endpoint += "?" + "&".join(params) tasks_data = await self._make_goodday_request(endpoint) if not tasks_data: return f"No tasks found in sprint '{actual_sprint_name}'." if isinstance(tasks_data, dict) and "error" in tasks_data: return f"Unable to fetch sprint tasks: {tasks_data.get('error', 'Unknown error')}" if not isinstance(tasks_data, list): return f"Unexpected response format: {str(tasks_data)}" if len(tasks_data) == 0: return f"Sprint '{actual_sprint_name}' exists but contains no tasks." # Get users data for name mapping users_data = await self._make_goodday_request("users") user_id_to_name = {} if isinstance(users_data, list): for u in users_data: if isinstance(u, dict): user_id_to_name[u.get("id")] = u.get("name", "Unknown") def user_display(user_id): if not user_id: return "Unassigned" name = user_id_to_name.get(user_id) return name if name else f"User {user_id}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Processing {len(tasks_data)} tasks and collecting descriptions...", "done": False, }, } ) # Analyze tasks and collect metrics status_counts = {} user_task_counts = {} task_summaries = [] for task in tasks_data: if not isinstance(task, dict): continue # Extract basic task information task_short_id = task.get("shortId", "N/A") task_name = task.get("name", "No title") task_id = task.get("id") status = ( task.get("status", {}) if isinstance(task.get("status"), dict) else {} ) status_name = status.get("name", "Unknown Status") assigned_user_id = task.get("assignedToUserId") assigned_user = user_display(assigned_user_id) # Count statistics status_counts[status_name] = status_counts.get(status_name, 0) + 1 user_task_counts[assigned_user] = ( user_task_counts.get(assigned_user, 0) + 1 ) # Get the first message as task description task_description = "No description available" if task_id: try: messages_endpoint = f"task/{task_id}/messages" messages_data = await self._make_goodday_request( messages_endpoint ) if ( messages_data and isinstance(messages_data, list) and len(messages_data) > 0 ): first_msg = messages_data[0] if isinstance(first_msg, dict): task_description = first_msg.get( "message", "No description available" ) except Exception: # If we can't get messages, continue without description pass # Create task summary with title and description task_summary = f""" **{task_short_id}**: {task_name} - **Status**: {status_name} - **Assigned To**: {assigned_user} - **Description**: {task_description} """.strip() task_summaries.append(task_summary) # Build comprehensive summary summary_parts = [] # Sprint Overview summary_parts.append( f"""**Sprint Overview:** - **Sprint**: {actual_sprint_name} - **Project**: {actual_project_name} - **Total Tasks**: {len(tasks_data)}""" ) # Status Distribution if status_counts: status_list = [ f" - {status}: {count}" for status, count in sorted(status_counts.items()) ] summary_parts.append( f"""**Status Distribution:** {chr(10).join(status_list)}""" ) # User Assignment Distribution if user_task_counts: user_list = [ f" - {user}: {count} tasks" for user, count in sorted( user_task_counts.items(), key=lambda x: x[1], reverse=True ) ] summary_parts.append( f"""**Task Assignment:** {chr(10).join(user_list)}""" ) # Individual Task Details if task_summaries: summary_parts.append( f"""**Task Details:** {chr(10).join(['---'] + task_summaries)}""" ) result = "\n\n".join(summary_parts) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Successfully generated sprint summary for '{actual_sprint_name}' with {len(tasks_data)} tasks", "done": True, }, } ) return f"**Sprint Summary for '{actual_sprint_name}' in '{actual_project_name}':**\n\n{result}" except Exception as e: error_msg = f"Failed to retrieve sprint summary: {str(e)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return error_msg async def search_project_documents( self, project_name: str, document_name: str = None, include_content: bool = False, __request__: Request = None, __user__: dict = None, __event_emitter__: Callable = None, ) -> str: """ Search for documents in a specific project by project name :param project_name: The name of the project to search in (case-insensitive) :param document_name: Optional document name to filter by (case-insensitive partial match) :param include_content: Whether to include the full content of each document (default: False) """ if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Searching for documents in project '{project_name}'...", "done": False, }, } ) try: # First, find the project ID by name if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Finding project '{project_name}'...", "done": False, }, } ) projects_data = await self._make_goodday_request("projects?archived=true") if not projects_data or not isinstance(projects_data, list): return "Unable to fetch projects or no projects found." # Filter to only PROJECT and FOLDER type projects project_type_projects = [p for p in projects_data if isinstance(p, dict) and p.get('systemType') in ['PROJECT', 'FOLDER']] # Find projects containing the search term in their name (case-insensitive) matching_projects = [] search_term = project_name.lower() for project in project_type_projects: if isinstance(project, dict) and search_term in project.get('name', '').lower(): matching_projects.append(project) if not matching_projects: return f"No projects found containing '{project_name}' in their name." # Use the first match target_project = matching_projects[0] actual_project_name = target_project.get('name') if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Found project '{actual_project_name}' (using first match of {len(matching_projects)} found)", "done": False, }, } ) project_id = target_project.get('id') if not project_id: return f"Project '{actual_project_name}' found but missing ID." # Get documents for the project if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching documents from project '{actual_project_name}'...", "done": False, }, } ) documents_data = await self._make_goodday_request(f"project/{project_id}/documents") if not documents_data: return f"No documents found in project '{actual_project_name}'." if isinstance(documents_data, dict) and "error" in documents_data: return f"Unable to fetch documents: {documents_data.get('error', 'Unknown error')}" if not isinstance(documents_data, list): return f"Unexpected response format for documents: {str(documents_data)}" # Filter documents by name if specified if document_name: filtered_documents = [] for doc in documents_data: if isinstance(doc, dict) and document_name.lower() in doc.get('name', '').lower(): filtered_documents.append(doc) documents_data = filtered_documents if not documents_data: return f"No documents found matching '{document_name}' in project '{actual_project_name}'." # Format documents if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Formatting {len(documents_data)} documents{' and fetching content' if include_content else ''}...", "done": False, }, } ) # Get user and project mappings for better display user_id_to_name = await self._get_user_mapping(__event_emitter__) project_id_to_name = await self._get_project_mapping(__event_emitter__) formatted_docs = [] for doc in documents_data: if isinstance(doc, dict): doc_id = doc.get('id', 'N/A') doc_content = "" # Fetch content if requested if include_content and doc_id != 'N/A': try: if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching content for document '{doc.get('name', 'N/A')}'...", "done": False, }, } ) content_data = await self._make_goodday_request(f"document/{doc_id}") if content_data: if isinstance(content_data, dict): doc_content = content_data.get('content', content_data.get('text', str(content_data))) else: doc_content = str(content_data) except Exception as e: doc_content = f"Error fetching content: {str(e)}" # Get display names project_id = doc.get('projectId', 'N/A') project_name = project_id_to_name.get(project_id, f"Project {project_id}") if project_id != 'N/A' else 'N/A' created_by_id = doc.get('createdByUserId', 'N/A') created_by_name = user_id_to_name.get(created_by_id, f"User {created_by_id}") if created_by_id != 'N/A' else 'N/A' # Format timestamps created_time = self._format_timestamp_ist(doc.get('momentCreated', 'N/A')) updated_time = self._format_timestamp_ist(doc.get('momentUpdated', 'N/A')) formatted_doc = f""" **Document ID:** {doc_id} **Name:** {doc.get('name', 'N/A')} **Project:** {project_name} **Created By:** {created_by_name} **Created:** {created_time} **Updated:** {updated_time}""" if include_content: formatted_doc += f"\n**Content:**\n{doc_content}" formatted_docs.append(formatted_doc.strip()) result = "\n---\n".join(formatted_docs) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Successfully found {len(documents_data)} documents", "done": True, }, } ) filter_text = f" matching '{document_name}'" if document_name else "" return f"**Documents in project '{actual_project_name}'{filter_text}:**\n\n{result}" except Exception as e: error_msg = f"Failed to search project documents: {str(e)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return error_msg async def get_document_content( self, document_id: str, __request__: Request = None, __user__: dict = None, __event_emitter__: Callable = None, ) -> str: """ Get the content of a specific document by its ID :param document_id: The ID of the document to retrieve """ if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching document content for ID '{document_id}'...", "done": False, }, } ) try: data = await self._make_goodday_request(f"document/{document_id}") if not data: return "Document not found or no content available." if isinstance(data, dict) and "error" in data: return f"Unable to fetch document: {data.get('error', 'Unknown error')}" # The document content might be in different formats # Assuming it's a dict with content field or direct content if isinstance(data, dict): content = data.get('content', data.get('text', str(data))) else: content = str(data) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": "Successfully retrieved document content", "done": True, }, } ) return f"**Document Content:**\n\n{content}" except Exception as e: error_msg = f"Failed to retrieve document content: {str(e)}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": error_msg, "done": True}, } ) return error_msg

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cdmx-in/goodday-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server