dag_code
Retrieve source code for Apache Airflow DAGs to inspect workflow definitions and troubleshoot pipeline logic within MCP-Airflow-API.
Instructions
[Tool Role]: Retrieves the source code for the specified DAG.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_id | Yes |
Implementation Reference
- Core handler function for the 'dag_code' tool. Fetches the source code of the specified DAG by making a GET request to the Airflow API endpoint '/dagSources/{dag_id}' and returns the JSON response.@mcp.tool() async def dag_code(dag_id: str) -> Dict[str, Any]: """[Tool Role]: Retrieves the source code for the specified DAG.""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("GET", f"/dagSources/{dag_id}") resp.raise_for_status() return resp.json()
- src/mcp_airflow_api/tools/v1_tools.py:13-27 (registration)Registration for Airflow v1 API tools, which sets the v1-specific airflow_request function and calls register_common_tools(mcp) to register 'dag_code' among other common tools.def register_tools(mcp): """Register v1 tools by importing common tools with v1 request function.""" logger.info("Initializing MCP server for Airflow API v1") logger.info("Loading Airflow API v1 tools (Airflow 2.x)") # Set the global request function to v1 common_tools.airflow_request = airflow_request_v1 # Register all 56 common tools (includes management tools) common_tools.register_common_tools(mcp) # V1 has no exclusive tools - all tools are shared with v2 logger.info("Registered all Airflow API v1 tools (56 tools: 43 core + 13 management tools)")
- src/mcp_airflow_api/tools/v2_tools.py:14-25 (registration)Registration for Airflow v2 API tools, which sets the v2-specific airflow_request function and calls register_common_tools(mcp) to register 'dag_code' among other common tools.def register_tools(mcp): """Register v2 tools: common tools + v2-exclusive asset tools.""" logger.info("Initializing MCP server for Airflow API v2") logger.info("Loading Airflow API v2 tools (Airflow 3.0+)") # Set the global request function to v2 common_tools.airflow_request = airflow_request_v2 # Register all 43 common tools common_tools.register_common_tools(mcp)
- Global airflow_request function pointer used by dag_code handler, set during registration to v1 or v2 specific implementation.# Global variable to hold the version-specific airflow_request function # This will be set by v1_tools.py or v2_tools.py during registration airflow_request = None