Skip to main content
Glama

dag_graph

Retrieve task graph structure for Apache Airflow DAGs to visualize workflow dependencies and monitor task relationships.

Instructions

[Tool Role]: Retrieves task graph structure for the specified DAG.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_idYes

Implementation Reference

  • The primary handler function for the 'dag_graph' MCP tool. Decorated with @mcp.tool() for automatic registration. Fetches DAG tasks via Airflow API, builds a graph structure with upstream/downstream dependencies, and returns nodes/edges for visualization.
    @mcp.tool()
    async def dag_graph(dag_id: str) -> Dict[str, Any]:
        """[Tool Role]: Retrieves task graph structure for the specified DAG."""
        if not dag_id:
            raise ValueError("dag_id must not be empty")
        resp = await airflow_request("GET", f"/dags/{dag_id}/tasks")
        resp.raise_for_status()
        tasks_data = resp.json()
        
        tasks = tasks_data.get("tasks", [])
        task_graph = {}
        
        for task in tasks:
            task_id = task.get("task_id")
            task_graph[task_id] = {
                "task_id": task_id,
                "task_type": task.get("class_ref", {}).get("class_name"),
                "downstream_task_ids": task.get("downstream_task_ids", []),
                "upstream_task_ids": task.get("upstream_task_ids", [])
            }
        
        return {
            "dag_id": dag_id,
            "task_graph": task_graph,
            "total_tasks": len(tasks),
            "task_relationships": {
                "nodes": list(task_graph.keys()),
                "edges": [(task_id, downstream) for task_id, task_info in task_graph.items() 
                         for downstream in task_info["downstream_task_ids"]]
            }
        }
  • Registration entry point for v1 API version. Sets the v1-specific airflow_request function and calls common_tools.register_common_tools(mcp), which defines and registers the dag_graph tool among others.
    def register_tools(mcp):
        """Register v1 tools by importing common tools with v1 request function."""
        
        logger.info("Initializing MCP server for Airflow API v1")
        logger.info("Loading Airflow API v1 tools (Airflow 2.x)")
        
        # Set the global request function to v1
        common_tools.airflow_request = airflow_request_v1
        
        # Register all 56 common tools (includes management tools)
        common_tools.register_common_tools(mcp)
        
        # V1 has no exclusive tools - all tools are shared with v2
        
        logger.info("Registered all Airflow API v1 tools (56 tools: 43 core + 13 management tools)")
  • Registration entry point for v2 API version. Sets the v2-specific airflow_request function and calls common_tools.register_common_tools(mcp), which defines and registers the dag_graph tool among others.
    def register_tools(mcp):
        """Register v2 tools: common tools + v2-exclusive asset tools."""
        
        logger.info("Initializing MCP server for Airflow API v2")
        logger.info("Loading Airflow API v2 tools (Airflow 3.0+)")
        
        # Set the global request function to v2
        common_tools.airflow_request = airflow_request_v2
        
        # Register all 43 common tools
        common_tools.register_common_tools(mcp)
  • Top-level registration in main MCP server setup. Conditionally imports and calls v1_tools.register_tools or v2_tools.register_tools based on detected Airflow API version, which ultimately registers the dag_graph tool.
    if api_version == "v1":
        logger.info("Loading Airflow API v1 tools (Airflow 2.x)")
        from mcp_airflow_api.tools import v1_tools
        v1_tools.register_tools(mcp_instance)
    elif api_version == "v2":
        logger.info("Loading Airflow API v2 tools (Airflow 3.0+)")
        from mcp_airflow_api.tools import v2_tools
        v2_tools.register_tools(mcp_instance)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server