dag_run_duration
Analyze DAG run durations by retrieving statistics for a specific DAG ID, customizable with a limit on recent runs to improve data accuracy and insights.
Instructions
[Tool Role]: Retrieves run duration statistics for a specific DAG.
Args: dag_id: The DAG ID to get run durations for limit: Maximum number of recent runs to analyze (default: 50, increased from 10 for better statistics)
Returns: DAG run duration data: dag_id, runs, statistics
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_id | Yes | ||
| limit | No |
Implementation Reference
- The core handler function for the 'dag_run_duration' tool. It queries the Airflow API for recent DAG runs, computes duration for each successful run (with both start and end dates), and returns detailed run durations along with aggregate statistics like average duration, fastest, and slowest runs.@mcp.tool() async def dag_run_duration(dag_id: str, limit: int = 10) -> Dict[str, Any]: """[Tool Role]: Analyzes DAG run durations and performance metrics.""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit={limit}&order_by=-execution_date") resp.raise_for_status() data = resp.json() runs = data.get("dag_runs", []) durations = [] for run in runs: start_date = run.get("start_date") end_date = run.get("end_date") if start_date and end_date: from datetime import datetime start = datetime.fromisoformat(start_date.replace('Z', '+00:00')) end = datetime.fromisoformat(end_date.replace('Z', '+00:00')) duration_seconds = (end - start).total_seconds() durations.append({ "run_id": run.get("run_id"), "duration_seconds": duration_seconds, "state": run.get("state"), "execution_date": run.get("execution_date") }) avg_duration = sum(d["duration_seconds"] for d in durations) / len(durations) if durations else 0 return { "dag_id": dag_id, "run_durations": durations, "statistics": { "average_duration_seconds": avg_duration, "total_analyzed_runs": len(durations), "fastest_run": min(durations, key=lambda x: x["duration_seconds"]) if durations else None, "slowest_run": max(durations, key=lambda x: x["duration_seconds"]) if durations else None } }