td_get_attempt_tasks
Retrieve detailed task breakdowns from workflow executions to identify failed steps, analyze performance bottlenecks, and understand task dependencies for debugging data processing issues.
Instructions
Get task breakdown to find which step failed or is slow in workflow.
Shows all individual tasks (steps) within a workflow execution with their
status, timing, and dependencies. Essential for debugging failed workflows.
Common scenarios:
- Find exactly which task/query failed in a complex workflow
- Identify slow-running tasks causing delays
- Understand task execution order and dependencies
- Debug data processing issues at task level
Returns task list with names, states, timing, and failure details.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| attempt_id | Yes |
Implementation Reference
- td_mcp_server/execution_tools.py:219-316 (handler)The handler function that fetches tasks for the given attempt_id using the TD client, processes them into a structured list with statistics, task details, types, errors, and dependencies.async def td_get_attempt_tasks(attempt_id: str) -> dict[str, Any]: """Get task breakdown to find which step failed or is slow in workflow. Shows all individual tasks (steps) within a workflow execution with their status, timing, and dependencies. Essential for debugging failed workflows. Common scenarios: - Find exactly which task/query failed in a complex workflow - Identify slow-running tasks causing delays - Understand task execution order and dependencies - Debug data processing issues at task level Returns task list with names, states, timing, and failure details. """ if not attempt_id or not attempt_id.strip(): return _format_error_response("Attempt ID cannot be empty") client = _create_client(include_workflow=True) if isinstance(client, dict): return client try: tasks = client.get_attempt_tasks(attempt_id) # Process tasks to create hierarchy and statistics task_list = [] task_stats = { "total": len(tasks), "success": 0, "failed": 0, "running": 0, "blocked": 0, "other": 0, } for task in tasks: task_info = { "id": task.id, "name": task.full_name, "state": task.state, "is_group": task.is_group, } # Add parent info for hierarchy if task.parent_id: task_info["parent_id"] = task.parent_id # Add timing info if task.started_at: task_info["started_at"] = task.started_at if task.updated_at: task_info["updated_at"] = task.updated_at # Add dependencies if task.upstreams: task_info["depends_on"] = task.upstreams # Add non-sensitive config if task.config: # Extract key task type info if "td>" in task.config: task_info["type"] = "td_query" if "database" in task.config["td>"]: task_info["database"] = task.config["td>"]["database"] elif "py>" in task.config: task_info["type"] = "python" elif "sh>" in task.config: task_info["type"] = "shell" else: task_info["type"] = "other" # Add error info if failed if task.error and task.state in ["failed", "error"]: task_info["error"] = task.error # Update statistics if task.state == "success": task_stats["success"] += 1 elif task.state in ["failed", "error"]: task_stats["failed"] += 1 elif task.state == "running": task_stats["running"] += 1 elif task.state == "blocked": task_stats["blocked"] += 1 else: task_stats["other"] += 1 task_list.append(task_info) return { "attempt_id": attempt_id, "tasks": task_list, "statistics": task_stats, } except Exception as e: return _format_error_response(f"Failed to get attempt tasks: {str(e)}")
- td_mcp_server/execution_tools.py:18-31 (registration)The registration function that sets up globals and registers td_get_attempt_tasks (line 29) among other execution tools using mcp.tool().def register_execution_tools(mcp_instance, create_client_func, format_error_func): """Register execution tools with the provided MCP instance.""" global mcp, _create_client, _format_error_response mcp = mcp_instance _create_client = create_client_func _format_error_response = format_error_func # Register all tools mcp.tool()(td_get_session) mcp.tool()(td_list_sessions) mcp.tool()(td_get_attempt) mcp.tool()(td_get_attempt_tasks) mcp.tool()(td_analyze_execution)