Skip to main content
Glama

dag_run_duration

Analyze DAG run durations and performance metrics to monitor workflow execution times and identify optimization opportunities in Apache Airflow.

Instructions

[Tool Role]: Analyzes DAG run durations and performance metrics.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_idYes
limitNo

Implementation Reference

  • Core handler function for the `dag_run_duration` tool. Fetches recent DAG runs via Airflow API, calculates execution durations, and computes performance statistics including average, fastest, and slowest runs.
    @mcp.tool()
    async def dag_run_duration(dag_id: str, limit: int = 10) -> Dict[str, Any]:
        """[Tool Role]: Analyzes DAG run durations and performance metrics."""
        if not dag_id:
            raise ValueError("dag_id must not be empty")
        
        resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit={limit}&order_by=-execution_date")
        resp.raise_for_status()
        data = resp.json()
        
        runs = data.get("dag_runs", [])
        durations = []
        
        for run in runs:
            start_date = run.get("start_date")
            end_date = run.get("end_date")
            if start_date and end_date:
                from datetime import datetime
                start = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
                end = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
                duration_seconds = (end - start).total_seconds()
                durations.append({
                    "run_id": run.get("run_id"),
                    "duration_seconds": duration_seconds,
                    "state": run.get("state"),
                    "execution_date": run.get("execution_date")
                })
        
        avg_duration = sum(d["duration_seconds"] for d in durations) / len(durations) if durations else 0
        
        return {
            "dag_id": dag_id,
            "run_durations": durations,
            "statistics": {
                "average_duration_seconds": avg_duration,
                "total_analyzed_runs": len(durations),
                "fastest_run": min(durations, key=lambda x: x["duration_seconds"]) if durations else None,
                "slowest_run": max(durations, key=lambda x: x["duration_seconds"]) if durations else None
            }
        }
  • Registration entry point for v1 API tools. Sets the v1-specific `airflow_request` function and calls `common_tools.register_common_tools(mcp)` which defines and registers the `dag_run_duration` handler.
    def register_tools(mcp):
        """Register v1 tools by importing common tools with v1 request function."""
        
        logger.info("Initializing MCP server for Airflow API v1")
        logger.info("Loading Airflow API v1 tools (Airflow 2.x)")
        
        # Set the global request function to v1
        common_tools.airflow_request = airflow_request_v1
        
        # Register all 56 common tools (includes management tools)
        common_tools.register_common_tools(mcp)
        
        # V1 has no exclusive tools - all tools are shared with v2
        
        logger.info("Registered all Airflow API v1 tools (56 tools: 43 core + 13 management tools)")
  • Registration entry point for v2 API tools. Sets the v2-specific `airflow_request` function and calls `common_tools.register_common_tools(mcp)` which defines and registers the `dag_run_duration` handler.
    def register_tools(mcp):
        """Register v2 tools: common tools + v2-exclusive asset tools."""
        
        logger.info("Initializing MCP server for Airflow API v2")
        logger.info("Loading Airflow API v2 tools (Airflow 3.0+)")
        
        # Set the global request function to v2
        common_tools.airflow_request = airflow_request_v2
        
        # Register all 43 common tools
        common_tools.register_common_tools(mcp)
        
        # Add V2-exclusive tools (2 tools)
        @mcp.tool()
        async def list_assets(limit: int = 20, offset: int = 0,
                             uri_pattern: Optional[str] = None) -> Dict[str, Any]:
            """
            [V2 New] List all assets in the system for data-aware scheduling.
            
            Assets are a key feature in Airflow 3.0 for data-aware scheduling.
            They enable workflows to be triggered by data changes rather than time schedules.
            
            Args:
                limit: Maximum number of assets to return (default: 20)
                offset: Number of assets to skip for pagination (default: 0)
                uri_pattern: Filter assets by URI pattern (optional)
                
            Returns:
                Dict containing assets list, pagination info, and metadata
            """
            params = {'limit': limit, 'offset': offset}
            if uri_pattern:
                params['uri_pattern'] = uri_pattern
                
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            
            resp = await airflow_request_v2("GET", f"/assets?{query_string}")
            resp.raise_for_status()
            data = resp.json()
            
            return {
                "assets": data.get("assets", []),
                "total_entries": data.get("total_entries", 0),
                "limit": limit,
                "offset": offset,
                "api_version": "v2",
                "feature": "assets"
            }
    
        @mcp.tool()
        async def list_asset_events(limit: int = 20, offset: int = 0,
                                   asset_uri: Optional[str] = None,
                                   source_dag_id: Optional[str] = None) -> Dict[str, Any]:
            """
            [V2 New] List asset events for data lineage tracking.
            
            Asset events track when assets are created or updated by DAGs.
            This enables data lineage tracking and data-aware scheduling in Airflow 3.0.
            
            Args:
                limit: Maximum number of events to return (default: 20)
                offset: Number of events to skip for pagination (default: 0)
                asset_uri: Filter events by specific asset URI (optional)
                source_dag_id: Filter events by source DAG that produced the event (optional)
                
            Returns:
                Dict containing asset events list, pagination info, and metadata
            """
            params = {'limit': limit, 'offset': offset}
            if asset_uri:
                params['asset_uri'] = asset_uri
            if source_dag_id:
                params['source_dag_id'] = source_dag_id
                
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            
            resp = await airflow_request_v2("GET", f"/assets/events?{query_string}")
            resp.raise_for_status()
            data = resp.json()
            
            return {
                "asset_events": data.get("asset_events", []),
                "total_entries": data.get("total_entries", 0),
                "limit": limit,
                "offset": offset,
                "api_version": "v2",
                "feature": "asset_events"
            }
    
        logger.info("Registered all Airflow API v2 tools (43 common + 2 assets + 4 management = 49 tools)")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server