Skip to main content
Glama

dag_task_duration

Analyze task durations in Apache Airflow DAG runs to identify performance bottlenecks and optimize workflow execution times.

Instructions

[Tool Role]: Analyzes task durations within a DAG run.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_idYes
dag_run_idNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • The main handler function implementing dag_task_duration tool. Fetches task instances for a specific DAG run (or latest if not specified), calculates duration for each completed task, and returns task-level performance metrics.
    @mcp.tool()
    async def dag_task_duration(dag_id: str, dag_run_id: Optional[str] = None) -> Dict[str, Any]:
        """[Tool Role]: Analyzes task durations within a DAG run."""
        if not dag_id:
            raise ValueError("dag_id must not be empty")
        
        if not dag_run_id:
            # Get the latest run
            resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit=1&order_by=-execution_date")
            resp.raise_for_status()
            runs = resp.json().get("dag_runs", [])
            if not runs:
                return {"error": f"No DAG runs found for DAG {dag_id}"}
            dag_run_id = runs[0]["run_id"]
        
        resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances")
        resp.raise_for_status()
        data = resp.json()
        
        task_durations = []
        for task in data.get("task_instances", []):
            start_date = task.get("start_date")
            end_date = task.get("end_date")
            if start_date and end_date:
                from datetime import datetime
                start = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
                end = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
                duration_seconds = (end - start).total_seconds()
                task_durations.append({
                    "task_id": task.get("task_id"),
                    "duration_seconds": duration_seconds,
                    "state": task.get("state"),
                    "start_date": start_date,
                    "end_date": end_date
                })
        
        return {
            "dag_id": dag_id,
            "dag_run_id": dag_run_id,
            "task_durations": task_durations,
            "total_tasks": len(task_durations)
        }
  • The registration point for v1 API that sets the v1-specific HTTP request function and calls register_common_tools(mcp), which registers dag_task_duration among other common tools.
    def register_tools(mcp):
        """Register v1 tools by importing common tools with v1 request function."""
        
        logger.info("Initializing MCP server for Airflow API v1")
        logger.info("Loading Airflow API v1 tools (Airflow 2.x)")
        
        # Set the global request function to v1
        common_tools.airflow_request = airflow_request_v1
        
        # Register all 56 common tools (includes management tools)
        common_tools.register_common_tools(mcp)
        
        # V1 has no exclusive tools - all tools are shared with v2
        
        logger.info("Registered all Airflow API v1 tools (56 tools: 43 core + 13 management tools)")
  • The registration point for v2 API that sets the v2-specific HTTP request function and calls register_common_tools(mcp), which registers dag_task_duration among other common tools.
    def register_tools(mcp):
        """Register v2 tools: common tools + v2-exclusive asset tools."""
        
        logger.info("Initializing MCP server for Airflow API v2")
        logger.info("Loading Airflow API v2 tools (Airflow 3.0+)")
        
        # Set the global request function to v2
        common_tools.airflow_request = airflow_request_v2
        
        # Register all 43 common tools
        common_tools.register_common_tools(mcp)
  • In mcp_main.py, calls v1_tools.register_tools(mcp) to register tools for v1 API (includes dag_task_duration).
        logger.info("Loading Airflow API v1 tools (Airflow 2.x)")
        from mcp_airflow_api.tools import v1_tools
        v1_tools.register_tools(mcp)
    elif api_version == "v2":
  • In mcp_main.py, calls v2_tools.register_tools(mcp) to register tools for v2 API (includes dag_task_duration).
    logger.info("Loading Airflow API v2 tools (Airflow 3.0+)")
    from mcp_airflow_api.tools import v2_tools
    v2_tools.register_tools(mcp)
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries the full burden of behavioral disclosure. It states the tool 'analyzes' task durations, which implies a read-only operation, but doesn't specify whether it requires authentication, has rate limits, returns aggregated data or raw metrics, or how it handles errors. This is inadequate for a tool with two parameters and an output schema.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, efficient sentence that directly states the tool's role without unnecessary words. It's front-loaded with the core purpose, making it easy to parse, though its brevity contributes to gaps in other dimensions.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool has two parameters with 0% schema coverage, no annotations, and an output schema (which relieves some burden), the description is incomplete. It doesn't explain parameter semantics, behavioral traits like data format or access requirements, or how it differs from siblings, making it insufficient for effective tool selection and invocation.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters2/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 0%, so the schema provides no parameter descriptions. The tool description doesn't mention any parameters, failing to explain what 'dag_id' and 'dag_run_id' represent, their formats, or how they affect the analysis. This leaves both parameters undocumented, with the description adding no value beyond the schema.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose3/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description states the tool 'analyzes task durations within a DAG run', which provides a clear verb ('analyzes') and resource ('task durations within a DAG run'). However, it doesn't distinguish this from sibling tools like 'dag_run_duration' or 'list_task_instances_all', leaving ambiguity about how this analysis differs from other duration-related tools.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

No explicit guidance is provided on when to use this tool versus alternatives. The description doesn't mention prerequisites, context, or exclusions, and it doesn't reference sibling tools like 'dag_run_duration' for comparison, leaving the agent to infer usage from the tool name alone.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server