dag_calendar
View DAG execution schedules and calendar for specified date ranges to monitor workflow timing in Apache Airflow.
Instructions
[Tool Role]: Shows DAG schedule and execution calendar for a date range.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_id | Yes | ||
| start_date | Yes | ||
| end_date | Yes |
Implementation Reference
- The core handler implementation for the 'dag_calendar' MCP tool. This function is defined inside register_common_tools(mcp) and uses the @mcp.tool() decorator for automatic registration. It queries the Airflow API for DAG runs in the given date range and returns formatted calendar data including execution dates, states, and run types.@mcp.tool() async def dag_calendar(dag_id: str, start_date: str, end_date: str) -> Dict[str, Any]: """[Tool Role]: Shows DAG schedule and execution calendar for a date range.""" if not dag_id: raise ValueError("dag_id must not be empty") params = { 'start_date_gte': start_date, 'start_date_lte': end_date, 'limit': 1000 } query_string = "&".join([f"{k}={v}" for k, v in params.items()]) resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?{query_string}") resp.raise_for_status() data = resp.json() calendar_data = [] for run in data.get("dag_runs", []): calendar_data.append({ "execution_date": run.get("execution_date"), "start_date": run.get("start_date"), "end_date": run.get("end_date"), "state": run.get("state"), "run_type": run.get("run_type") }) return { "dag_id": dag_id, "date_range": {"start": start_date, "end": end_date}, "calendar_entries": calendar_data, "total_runs": len(calendar_data) }