Skip to main content
Glama

dag_run_duration

Analyze DAG run durations by retrieving statistics for a specific DAG ID, customizable with a limit on recent runs to improve data accuracy and insights.

Instructions

[Tool Role]: Retrieves run duration statistics for a specific DAG.

Args: dag_id: The DAG ID to get run durations for limit: Maximum number of recent runs to analyze (default: 50, increased from 10 for better statistics)

Returns: DAG run duration data: dag_id, runs, statistics

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_idYes
limitNo

Implementation Reference

  • The core handler function for the 'dag_run_duration' tool. It queries the Airflow API for recent DAG runs, computes duration for each successful run (with both start and end dates), and returns detailed run durations along with aggregate statistics like average duration, fastest, and slowest runs.
    @mcp.tool() async def dag_run_duration(dag_id: str, limit: int = 10) -> Dict[str, Any]: """[Tool Role]: Analyzes DAG run durations and performance metrics.""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit={limit}&order_by=-execution_date") resp.raise_for_status() data = resp.json() runs = data.get("dag_runs", []) durations = [] for run in runs: start_date = run.get("start_date") end_date = run.get("end_date") if start_date and end_date: from datetime import datetime start = datetime.fromisoformat(start_date.replace('Z', '+00:00')) end = datetime.fromisoformat(end_date.replace('Z', '+00:00')) duration_seconds = (end - start).total_seconds() durations.append({ "run_id": run.get("run_id"), "duration_seconds": duration_seconds, "state": run.get("state"), "execution_date": run.get("execution_date") }) avg_duration = sum(d["duration_seconds"] for d in durations) / len(durations) if durations else 0 return { "dag_id": dag_id, "run_durations": durations, "statistics": { "average_duration_seconds": avg_duration, "total_analyzed_runs": len(durations), "fastest_run": min(durations, key=lambda x: x["duration_seconds"]) if durations else None, "slowest_run": max(durations, key=lambda x: x["duration_seconds"]) if durations else None } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server