Skip to main content
Glama

dag_run_duration

Analyze DAG run durations and performance metrics to monitor workflow execution times and identify optimization opportunities in Apache Airflow.

Instructions

[Tool Role]: Analyzes DAG run durations and performance metrics.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_idYes
limitNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • Core handler function for the `dag_run_duration` tool. Fetches recent DAG runs via Airflow API, calculates execution durations, and computes performance statistics including average, fastest, and slowest runs.
    @mcp.tool()
    async def dag_run_duration(dag_id: str, limit: int = 10) -> Dict[str, Any]:
        """[Tool Role]: Analyzes DAG run durations and performance metrics."""
        if not dag_id:
            raise ValueError("dag_id must not be empty")
        
        resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit={limit}&order_by=-execution_date")
        resp.raise_for_status()
        data = resp.json()
        
        runs = data.get("dag_runs", [])
        durations = []
        
        for run in runs:
            start_date = run.get("start_date")
            end_date = run.get("end_date")
            if start_date and end_date:
                from datetime import datetime
                start = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
                end = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
                duration_seconds = (end - start).total_seconds()
                durations.append({
                    "run_id": run.get("run_id"),
                    "duration_seconds": duration_seconds,
                    "state": run.get("state"),
                    "execution_date": run.get("execution_date")
                })
        
        avg_duration = sum(d["duration_seconds"] for d in durations) / len(durations) if durations else 0
        
        return {
            "dag_id": dag_id,
            "run_durations": durations,
            "statistics": {
                "average_duration_seconds": avg_duration,
                "total_analyzed_runs": len(durations),
                "fastest_run": min(durations, key=lambda x: x["duration_seconds"]) if durations else None,
                "slowest_run": max(durations, key=lambda x: x["duration_seconds"]) if durations else None
            }
        }
  • Registration entry point for v1 API tools. Sets the v1-specific `airflow_request` function and calls `common_tools.register_common_tools(mcp)` which defines and registers the `dag_run_duration` handler.
    def register_tools(mcp):
        """Register v1 tools by importing common tools with v1 request function."""
        
        logger.info("Initializing MCP server for Airflow API v1")
        logger.info("Loading Airflow API v1 tools (Airflow 2.x)")
        
        # Set the global request function to v1
        common_tools.airflow_request = airflow_request_v1
        
        # Register all 56 common tools (includes management tools)
        common_tools.register_common_tools(mcp)
        
        # V1 has no exclusive tools - all tools are shared with v2
        
        logger.info("Registered all Airflow API v1 tools (56 tools: 43 core + 13 management tools)")
  • Registration entry point for v2 API tools. Sets the v2-specific `airflow_request` function and calls `common_tools.register_common_tools(mcp)` which defines and registers the `dag_run_duration` handler.
    def register_tools(mcp):
        """Register v2 tools: common tools + v2-exclusive asset tools."""
        
        logger.info("Initializing MCP server for Airflow API v2")
        logger.info("Loading Airflow API v2 tools (Airflow 3.0+)")
        
        # Set the global request function to v2
        common_tools.airflow_request = airflow_request_v2
        
        # Register all 43 common tools
        common_tools.register_common_tools(mcp)
        
        # Add V2-exclusive tools (2 tools)
        @mcp.tool()
        async def list_assets(limit: int = 20, offset: int = 0,
                             uri_pattern: Optional[str] = None) -> Dict[str, Any]:
            """
            [V2 New] List all assets in the system for data-aware scheduling.
            
            Assets are a key feature in Airflow 3.0 for data-aware scheduling.
            They enable workflows to be triggered by data changes rather than time schedules.
            
            Args:
                limit: Maximum number of assets to return (default: 20)
                offset: Number of assets to skip for pagination (default: 0)
                uri_pattern: Filter assets by URI pattern (optional)
                
            Returns:
                Dict containing assets list, pagination info, and metadata
            """
            params = {'limit': limit, 'offset': offset}
            if uri_pattern:
                params['uri_pattern'] = uri_pattern
                
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            
            resp = await airflow_request_v2("GET", f"/assets?{query_string}")
            resp.raise_for_status()
            data = resp.json()
            
            return {
                "assets": data.get("assets", []),
                "total_entries": data.get("total_entries", 0),
                "limit": limit,
                "offset": offset,
                "api_version": "v2",
                "feature": "assets"
            }
    
        @mcp.tool()
        async def list_asset_events(limit: int = 20, offset: int = 0,
                                   asset_uri: Optional[str] = None,
                                   source_dag_id: Optional[str] = None) -> Dict[str, Any]:
            """
            [V2 New] List asset events for data lineage tracking.
            
            Asset events track when assets are created or updated by DAGs.
            This enables data lineage tracking and data-aware scheduling in Airflow 3.0.
            
            Args:
                limit: Maximum number of events to return (default: 20)
                offset: Number of events to skip for pagination (default: 0)
                asset_uri: Filter events by specific asset URI (optional)
                source_dag_id: Filter events by source DAG that produced the event (optional)
                
            Returns:
                Dict containing asset events list, pagination info, and metadata
            """
            params = {'limit': limit, 'offset': offset}
            if asset_uri:
                params['asset_uri'] = asset_uri
            if source_dag_id:
                params['source_dag_id'] = source_dag_id
                
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            
            resp = await airflow_request_v2("GET", f"/assets/events?{query_string}")
            resp.raise_for_status()
            data = resp.json()
            
            return {
                "asset_events": data.get("asset_events", []),
                "total_entries": data.get("total_entries", 0),
                "limit": limit,
                "offset": offset,
                "api_version": "v2",
                "feature": "asset_events"
            }
    
        logger.info("Registered all Airflow API v2 tools (43 common + 2 assets + 4 management = 49 tools)")
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden of behavioral disclosure. It mentions analysis but does not specify if this is a read-only operation, what data it returns, or any performance implications like rate limits or permissions required. The description is too vague to inform the agent about behavioral traits beyond the basic action.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, efficient sentence that directly states the tool's role without unnecessary words. It is front-loaded and easy to parse, though it could be more informative. The brevity is appropriate but under-specified, not wasteful.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness3/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool has an output schema, the description does not need to explain return values. However, with no annotations, low schema coverage, and two parameters, the description is incomplete—it lacks details on behavior, parameter meaning, and usage context. It meets a minimal baseline but has clear gaps for a tool with this complexity.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters2/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 0%, meaning parameters 'dag_id' and 'limit' are undocumented in the schema. The description adds no semantic information about these parameters, such as what 'dag_id' refers to or how 'limit' affects results. It fails to compensate for the lack of schema documentation, leaving parameters largely unexplained.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose3/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description states the tool 'Analyzes DAG run durations and performance metrics,' which provides a clear verb ('Analyzes') and resource ('DAG run durations and performance metrics'). However, it does not differentiate from siblings like 'dag_task_duration' or 'all_dag_event_summary,' making it somewhat vague in comparison. The purpose is understandable but lacks specificity about scope or unique function.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description offers no guidance on when to use this tool versus alternatives. It does not mention context, prerequisites, or exclusions, such as how it differs from 'dag_task_duration' or 'failed_dags.' Without any usage instructions, the agent must infer from the name and schema alone, which is insufficient for optimal tool selection.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server