dag_task_duration
Analyze task durations in Apache Airflow DAG runs to identify performance bottlenecks and optimize workflow execution times.
Instructions
[Tool Role]: Analyzes task durations within a DAG run.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_id | Yes | ||
| dag_run_id | No |
Implementation Reference
- The main handler function implementing dag_task_duration tool. Fetches task instances for a specific DAG run (or latest if not specified), calculates duration for each completed task, and returns task-level performance metrics.@mcp.tool() async def dag_task_duration(dag_id: str, dag_run_id: Optional[str] = None) -> Dict[str, Any]: """[Tool Role]: Analyzes task durations within a DAG run.""" if not dag_id: raise ValueError("dag_id must not be empty") if not dag_run_id: # Get the latest run resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit=1&order_by=-execution_date") resp.raise_for_status() runs = resp.json().get("dag_runs", []) if not runs: return {"error": f"No DAG runs found for DAG {dag_id}"} dag_run_id = runs[0]["run_id"] resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances") resp.raise_for_status() data = resp.json() task_durations = [] for task in data.get("task_instances", []): start_date = task.get("start_date") end_date = task.get("end_date") if start_date and end_date: from datetime import datetime start = datetime.fromisoformat(start_date.replace('Z', '+00:00')) end = datetime.fromisoformat(end_date.replace('Z', '+00:00')) duration_seconds = (end - start).total_seconds() task_durations.append({ "task_id": task.get("task_id"), "duration_seconds": duration_seconds, "state": task.get("state"), "start_date": start_date, "end_date": end_date }) return { "dag_id": dag_id, "dag_run_id": dag_run_id, "task_durations": task_durations, "total_tasks": len(task_durations) }
- src/mcp_airflow_api/tools/v1_tools.py:13-28 (registration)The registration point for v1 API that sets the v1-specific HTTP request function and calls register_common_tools(mcp), which registers dag_task_duration among other common tools.def register_tools(mcp): """Register v1 tools by importing common tools with v1 request function.""" logger.info("Initializing MCP server for Airflow API v1") logger.info("Loading Airflow API v1 tools (Airflow 2.x)") # Set the global request function to v1 common_tools.airflow_request = airflow_request_v1 # Register all 56 common tools (includes management tools) common_tools.register_common_tools(mcp) # V1 has no exclusive tools - all tools are shared with v2 logger.info("Registered all Airflow API v1 tools (56 tools: 43 core + 13 management tools)")
- src/mcp_airflow_api/tools/v2_tools.py:14-25 (registration)The registration point for v2 API that sets the v2-specific HTTP request function and calls register_common_tools(mcp), which registers dag_task_duration among other common tools.def register_tools(mcp): """Register v2 tools: common tools + v2-exclusive asset tools.""" logger.info("Initializing MCP server for Airflow API v2") logger.info("Loading Airflow API v2 tools (Airflow 3.0+)") # Set the global request function to v2 common_tools.airflow_request = airflow_request_v2 # Register all 43 common tools common_tools.register_common_tools(mcp)
- src/mcp_airflow_api/mcp_main.py:383-386 (registration)In mcp_main.py, calls v1_tools.register_tools(mcp) to register tools for v1 API (includes dag_task_duration).logger.info("Loading Airflow API v1 tools (Airflow 2.x)") from mcp_airflow_api.tools import v1_tools v1_tools.register_tools(mcp) elif api_version == "v2":
- src/mcp_airflow_api/mcp_main.py:387-389 (registration)In mcp_main.py, calls v2_tools.register_tools(mcp) to register tools for v2 API (includes dag_task_duration).logger.info("Loading Airflow API v2 tools (Airflow 3.0+)") from mcp_airflow_api.tools import v2_tools v2_tools.register_tools(mcp)