dag_graph
Retrieve task graph structure for Apache Airflow DAGs to visualize workflow dependencies and monitor task relationships.
Instructions
[Tool Role]: Retrieves task graph structure for the specified DAG.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_id | Yes |
Implementation Reference
- The primary handler function for the 'dag_graph' MCP tool. Decorated with @mcp.tool() for automatic registration. Fetches DAG tasks via Airflow API, builds a graph structure with upstream/downstream dependencies, and returns nodes/edges for visualization.@mcp.tool() async def dag_graph(dag_id: str) -> Dict[str, Any]: """[Tool Role]: Retrieves task graph structure for the specified DAG.""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("GET", f"/dags/{dag_id}/tasks") resp.raise_for_status() tasks_data = resp.json() tasks = tasks_data.get("tasks", []) task_graph = {} for task in tasks: task_id = task.get("task_id") task_graph[task_id] = { "task_id": task_id, "task_type": task.get("class_ref", {}).get("class_name"), "downstream_task_ids": task.get("downstream_task_ids", []), "upstream_task_ids": task.get("upstream_task_ids", []) } return { "dag_id": dag_id, "task_graph": task_graph, "total_tasks": len(tasks), "task_relationships": { "nodes": list(task_graph.keys()), "edges": [(task_id, downstream) for task_id, task_info in task_graph.items() for downstream in task_info["downstream_task_ids"]] } }
- src/mcp_airflow_api/tools/v1_tools.py:13-28 (registration)Registration entry point for v1 API version. Sets the v1-specific airflow_request function and calls common_tools.register_common_tools(mcp), which defines and registers the dag_graph tool among others.def register_tools(mcp): """Register v1 tools by importing common tools with v1 request function.""" logger.info("Initializing MCP server for Airflow API v1") logger.info("Loading Airflow API v1 tools (Airflow 2.x)") # Set the global request function to v1 common_tools.airflow_request = airflow_request_v1 # Register all 56 common tools (includes management tools) common_tools.register_common_tools(mcp) # V1 has no exclusive tools - all tools are shared with v2 logger.info("Registered all Airflow API v1 tools (56 tools: 43 core + 13 management tools)")
- src/mcp_airflow_api/tools/v2_tools.py:14-25 (registration)Registration entry point for v2 API version. Sets the v2-specific airflow_request function and calls common_tools.register_common_tools(mcp), which defines and registers the dag_graph tool among others.def register_tools(mcp): """Register v2 tools: common tools + v2-exclusive asset tools.""" logger.info("Initializing MCP server for Airflow API v2") logger.info("Loading Airflow API v2 tools (Airflow 3.0+)") # Set the global request function to v2 common_tools.airflow_request = airflow_request_v2 # Register all 43 common tools common_tools.register_common_tools(mcp)
- src/mcp_airflow_api/mcp_main.py:263-270 (registration)Top-level registration in main MCP server setup. Conditionally imports and calls v1_tools.register_tools or v2_tools.register_tools based on detected Airflow API version, which ultimately registers the dag_graph tool.if api_version == "v1": logger.info("Loading Airflow API v1 tools (Airflow 2.x)") from mcp_airflow_api.tools import v1_tools v1_tools.register_tools(mcp_instance) elif api_version == "v2": logger.info("Loading Airflow API v2 tools (Airflow 3.0+)") from mcp_airflow_api.tools import v2_tools v2_tools.register_tools(mcp_instance)