Skip to main content
Glama

dag_task_duration

Analyze task durations in Apache Airflow DAG runs to identify performance bottlenecks and optimize workflow execution times.

Instructions

[Tool Role]: Analyzes task durations within a DAG run.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_idYes
dag_run_idNo

Implementation Reference

  • The main handler function implementing dag_task_duration tool. Fetches task instances for a specific DAG run (or latest if not specified), calculates duration for each completed task, and returns task-level performance metrics.
    @mcp.tool()
    async def dag_task_duration(dag_id: str, dag_run_id: Optional[str] = None) -> Dict[str, Any]:
        """[Tool Role]: Analyzes task durations within a DAG run."""
        if not dag_id:
            raise ValueError("dag_id must not be empty")
        
        if not dag_run_id:
            # Get the latest run
            resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit=1&order_by=-execution_date")
            resp.raise_for_status()
            runs = resp.json().get("dag_runs", [])
            if not runs:
                return {"error": f"No DAG runs found for DAG {dag_id}"}
            dag_run_id = runs[0]["run_id"]
        
        resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances")
        resp.raise_for_status()
        data = resp.json()
        
        task_durations = []
        for task in data.get("task_instances", []):
            start_date = task.get("start_date")
            end_date = task.get("end_date")
            if start_date and end_date:
                from datetime import datetime
                start = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
                end = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
                duration_seconds = (end - start).total_seconds()
                task_durations.append({
                    "task_id": task.get("task_id"),
                    "duration_seconds": duration_seconds,
                    "state": task.get("state"),
                    "start_date": start_date,
                    "end_date": end_date
                })
        
        return {
            "dag_id": dag_id,
            "dag_run_id": dag_run_id,
            "task_durations": task_durations,
            "total_tasks": len(task_durations)
        }
  • The registration point for v1 API that sets the v1-specific HTTP request function and calls register_common_tools(mcp), which registers dag_task_duration among other common tools.
    def register_tools(mcp):
        """Register v1 tools by importing common tools with v1 request function."""
        
        logger.info("Initializing MCP server for Airflow API v1")
        logger.info("Loading Airflow API v1 tools (Airflow 2.x)")
        
        # Set the global request function to v1
        common_tools.airflow_request = airflow_request_v1
        
        # Register all 56 common tools (includes management tools)
        common_tools.register_common_tools(mcp)
        
        # V1 has no exclusive tools - all tools are shared with v2
        
        logger.info("Registered all Airflow API v1 tools (56 tools: 43 core + 13 management tools)")
  • The registration point for v2 API that sets the v2-specific HTTP request function and calls register_common_tools(mcp), which registers dag_task_duration among other common tools.
    def register_tools(mcp):
        """Register v2 tools: common tools + v2-exclusive asset tools."""
        
        logger.info("Initializing MCP server for Airflow API v2")
        logger.info("Loading Airflow API v2 tools (Airflow 3.0+)")
        
        # Set the global request function to v2
        common_tools.airflow_request = airflow_request_v2
        
        # Register all 43 common tools
        common_tools.register_common_tools(mcp)
  • In mcp_main.py, calls v1_tools.register_tools(mcp) to register tools for v1 API (includes dag_task_duration).
        logger.info("Loading Airflow API v1 tools (Airflow 2.x)")
        from mcp_airflow_api.tools import v1_tools
        v1_tools.register_tools(mcp)
    elif api_version == "v2":
  • In mcp_main.py, calls v2_tools.register_tools(mcp) to register tools for v2 API (includes dag_task_duration).
    logger.info("Loading Airflow API v2 tools (Airflow 3.0+)")
    from mcp_airflow_api.tools import v2_tools
    v2_tools.register_tools(mcp)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server