Skip to main content
Glama

failed_dags

Identify and review recently failed DAG runs in an Apache Airflow cluster, providing detailed information on dag_id, run_id, execution dates, and task states for troubleshooting.

Instructions

[Tool Role]: Lists all recently failed DAG runs in the Airflow cluster.

Returns: List of failed DAG runs with comprehensive info: dag_id, run_id, state, execution_date, start_date, end_date, data_interval_start, data_interval_end, run_type, conf, external_trigger, dag_display_name

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • The core handler function for the 'failed_dags' MCP tool. It queries the Airflow API for recently failed DAG runs (state=failed, limit=1000, ordered by start_date descending), extracts relevant run information, and returns a structured dictionary with the list of failed runs, total count, and query metadata.
    async def failed_dags() -> Dict[str, Any]: """[Tool Role]: Lists all recently failed DAG runs in the Airflow cluster.""" resp = await airflow_request("GET", "/dags/~/dagRuns?state=failed&limit=1000&order_by=-start_date") resp.raise_for_status() data = resp.json() failed_runs = [] for run in data.get("dag_runs", []): run_info = { "dag_id": run.get("dag_id"), "dag_display_name": run.get("dag_display_name"), "run_id": run.get("run_id"), "run_type": run.get("run_type"), "state": run.get("state"), "execution_date": run.get("execution_date"), "start_date": run.get("start_date"), "end_date": run.get("end_date"), "data_interval_start": run.get("data_interval_start"), "data_interval_end": run.get("data_interval_end"), "external_trigger": run.get("external_trigger"), "conf": run.get("conf"), "note": run.get("note") } failed_runs.append(run_info) return { "dag_runs": failed_runs, "total_failed": len(failed_runs), "query_info": { "state_filter": "failed", "limit": 1000, "order_by": "start_date (descending)" } }
  • Registration entry point for Airflow API v1 tools. Sets the v1-specific airflow_request function and calls register_common_tools(mcp), which defines and registers the failed_dags tool among others.
    def register_tools(mcp): """Register v1 tools by importing common tools with v1 request function.""" logger.info("Initializing MCP server for Airflow API v1") logger.info("Loading Airflow API v1 tools (Airflow 2.x)") # Set the global request function to v1 common_tools.airflow_request = airflow_request_v1 # Register all 56 common tools (includes management tools) common_tools.register_common_tools(mcp) # V1 has no exclusive tools - all tools are shared with v2 logger.info("Registered all Airflow API v1 tools (56 tools: 43 core + 13 management tools)")
  • Registration entry point for Airflow API v2 tools. Sets the v2-specific airflow_request function and calls register_common_tools(mcp), which defines and registers the failed_dags tool among others.
    def register_tools(mcp): """Register v2 tools: common tools + v2-exclusive asset tools.""" logger.info("Initializing MCP server for Airflow API v2") logger.info("Loading Airflow API v2 tools (Airflow 3.0+)") # Set the global request function to v2 common_tools.airflow_request = airflow_request_v2 # Register all 43 common tools common_tools.register_common_tools(mcp)
  • The function that defines all common tools (including failed_dags) using @mcp.tool() decorators and registers them to the MCP server instance. Called by both v1_tools and v2_tools register functions.
    def register_common_tools(mcp): """Register all 43 common tools that work with both v1 and v2 APIs.""" if airflow_request is None: raise RuntimeError("airflow_request function must be set before registering common tools") logger.info("Registering common tools shared between v1 and v2")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server