get_tasks
Retrieve current tasks running in an OpenSearch cluster to monitor operations and manage cluster performance.
Instructions
Get current tasks in cluster
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- The async handler function for the 'get_tasks' MCP tool. It fetches current tasks from the OpenSearch cluster using cat.tasks, filters duplicates by task type, and returns unique tasks as TextContent.@mcp.tool(description="Get current tasks in cluster") async def get_tasks() -> list[TextContent]: """ Get current tasks running in the cluster. Filters duplicate task types to show only unique operations. """ self.logger.info("Fetching cluster tasks...") try: response = self.es_client.cat.tasks(v=True) lines = response.split('\n') seen_tasks = set() filtered_lines = [] for line in lines: if not line.strip(): continue task_type = line.split()[0] if task_type not in seen_tasks: seen_tasks.add(task_type) filtered_lines.append(line) if filtered_lines: return [TextContent(type="text", text='\n'.join(filtered_lines))] else: return [TextContent(type="text", text="No tasks currently running in the cluster.")] except Exception as e: self.logger.error(f"Error fetching tasks: {e}") return [TextContent(type="text", text=f"Error: {str(e)}")]
- src/opensearch_mcp_server/server.py:41-41 (registration)Registration of AdminClusterTools, which includes the get_tasks tool, by calling register_tools on the MCP instance.admin_cluster_tools.register_tools(self.mcp)