Skip to main content
Glama
knishioka

Treasure Data MCP Server

by knishioka

td_get_attempt_tasks

Debug workflow failures by retrieving task breakdowns to identify failed or slow steps, their status, timing, and dependencies.

Instructions

Get task breakdown to find which step failed or is slow in workflow.

Shows all individual tasks (steps) within a workflow execution with their
status, timing, and dependencies. Essential for debugging failed workflows.

Common scenarios:
- Find exactly which task/query failed in a complex workflow
- Identify slow-running tasks causing delays
- Understand task execution order and dependencies
- Debug data processing issues at task level

Returns task list with names, states, timing, and failure details.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
attempt_idYes

Implementation Reference

  • The handler function for td_get_attempt_tasks tool. It retrieves tasks for the given attempt_id using the TD client, processes each task to extract relevant details (hierarchy, timing, dependencies, type, errors), computes statistics, and returns a structured response.
    async def td_get_attempt_tasks(attempt_id: str) -> dict[str, Any]:
        """Get task breakdown to find which step failed or is slow in workflow.
    
        Shows all individual tasks (steps) within a workflow execution with their
        status, timing, and dependencies. Essential for debugging failed workflows.
    
        Common scenarios:
        - Find exactly which task/query failed in a complex workflow
        - Identify slow-running tasks causing delays
        - Understand task execution order and dependencies
        - Debug data processing issues at task level
    
        Returns task list with names, states, timing, and failure details.
        """
        if not attempt_id or not attempt_id.strip():
            return _format_error_response("Attempt ID cannot be empty")
    
        client = _create_client(include_workflow=True)
        if isinstance(client, dict):
            return client
    
        try:
            tasks = client.get_attempt_tasks(attempt_id)
    
            # Process tasks to create hierarchy and statistics
            task_list = []
            task_stats = {
                "total": len(tasks),
                "success": 0,
                "failed": 0,
                "running": 0,
                "blocked": 0,
                "other": 0,
            }
    
            for task in tasks:
                task_info = {
                    "id": task.id,
                    "name": task.full_name,
                    "state": task.state,
                    "is_group": task.is_group,
                }
    
                # Add parent info for hierarchy
                if task.parent_id:
                    task_info["parent_id"] = task.parent_id
    
                # Add timing info
                if task.started_at:
                    task_info["started_at"] = task.started_at
                if task.updated_at:
                    task_info["updated_at"] = task.updated_at
    
                # Add dependencies
                if task.upstreams:
                    task_info["depends_on"] = task.upstreams
    
                # Add non-sensitive config
                if task.config:
                    # Extract key task type info
                    if "td>" in task.config:
                        task_info["type"] = "td_query"
                        if "database" in task.config["td>"]:
                            task_info["database"] = task.config["td>"]["database"]
                    elif "py>" in task.config:
                        task_info["type"] = "python"
                    elif "sh>" in task.config:
                        task_info["type"] = "shell"
                    else:
                        task_info["type"] = "other"
    
                # Add error info if failed
                if task.error and task.state in ["failed", "error"]:
                    task_info["error"] = task.error
    
                # Update statistics
                if task.state == "success":
                    task_stats["success"] += 1
                elif task.state in ["failed", "error"]:
                    task_stats["failed"] += 1
                elif task.state == "running":
                    task_stats["running"] += 1
                elif task.state == "blocked":
                    task_stats["blocked"] += 1
                else:
                    task_stats["other"] += 1
    
                task_list.append(task_info)
    
            return {
                "attempt_id": attempt_id,
                "tasks": task_list,
                "statistics": task_stats,
            }
    
        except Exception as e:
            return _format_error_response(f"Failed to get attempt tasks: {str(e)}")
  • The registration function that sets up the MCP instance and globals, then registers all execution tools including td_get_attempt_tasks using mcp.tool() decorators.
    def register_execution_tools(mcp_instance, create_client_func, format_error_func):
        """Register execution tools with the provided MCP instance."""
        global mcp, _create_client, _format_error_response
        mcp = mcp_instance
        _create_client = create_client_func
        _format_error_response = format_error_func
    
        # Register all tools
        mcp.tool()(td_get_session)
        mcp.tool()(td_list_sessions)
        mcp.tool()(td_get_attempt)
        mcp.tool()(td_get_attempt_tasks)
        mcp.tool()(td_analyze_execution)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/knishioka/td-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server