Skip to main content
Glama

running_dags

Monitor active DAG runs in Apache Airflow clusters to track workflow execution status and identify currently processing pipelines.

Instructions

[Tool Role]: Lists all currently running DAG runs in the Airflow cluster.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • Core handler implementation for the 'running_dags' tool. Fetches running DAG runs from Airflow API endpoint '/dags/~/dagRuns?state=running', processes the response, and returns structured data including run details and summary statistics.
    @mcp.tool() async def running_dags() -> Dict[str, Any]: """[Tool Role]: Lists all currently running DAG runs in the Airflow cluster.""" resp = await airflow_request("GET", "/dags/~/dagRuns?state=running&limit=1000&order_by=-start_date") resp.raise_for_status() data = resp.json() running_runs = [] for run in data.get("dag_runs", []): run_info = { "dag_id": run.get("dag_id"), "dag_display_name": run.get("dag_display_name"), "run_id": run.get("run_id"), "run_type": run.get("run_type"), "state": run.get("state"), "execution_date": run.get("execution_date"), "start_date": run.get("start_date"), "end_date": run.get("end_date"), "data_interval_start": run.get("data_interval_start"), "data_interval_end": run.get("data_interval_end"), "external_trigger": run.get("external_trigger"), "conf": run.get("conf"), "note": run.get("note") } running_runs.append(run_info) return { "dag_runs": running_runs, "total_running": len(running_runs), "query_info": { "state_filter": "running", "limit": 1000, "order_by": "start_date (descending)" } }
  • Registration entry point for v1 API version. Sets the v1-specific airflow_request function and calls register_common_tools(mcp), which defines and registers the running_dags handler.
    def register_tools(mcp): """Register v1 tools by importing common tools with v1 request function.""" logger.info("Initializing MCP server for Airflow API v1") logger.info("Loading Airflow API v1 tools (Airflow 2.x)") # Set the global request function to v1 common_tools.airflow_request = airflow_request_v1 # Register all 56 common tools (includes management tools) common_tools.register_common_tools(mcp) # V1 has no exclusive tools - all tools are shared with v2 logger.info("Registered all Airflow API v1 tools (56 tools: 43 core + 13 management tools)")
  • Registration entry point for v2 API version. Sets the v2-specific airflow_request function and calls register_common_tools(mcp), which defines and registers the running_dags handler.
    def register_tools(mcp): """Register v2 tools: common tools + v2-exclusive asset tools.""" logger.info("Initializing MCP server for Airflow API v2") logger.info("Loading Airflow API v2 tools (Airflow 3.0+)") # Set the global request function to v2 common_tools.airflow_request = airflow_request_v2 # Register all 43 common tools common_tools.register_common_tools(mcp)
  • The register_common_tools function where all common tools, including running_dags, are defined and registered using @mcp.tool() decorator when this function is called from v1/v2_tools.
    def register_common_tools(mcp): """Register all 43 common tools that work with both v1 and v2 APIs."""

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server