Skip to main content
Glama
yangkyeongmo

MCP Server for Apache Airflow

by yangkyeongmo

fetch_dags

Retrieve and filter DAGs from Apache Airflow using customizable parameters like tags, status, and naming patterns for efficient workflow management.

Instructions

Fetch all DAGs

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_id_patternNo
limitNo
offsetNo
only_activeNo
order_byNo
pausedNo
tagsNo

Implementation Reference

  • The main handler function for the "fetch_dags" MCP tool. It accepts optional parameters for filtering DAGs, calls the Airflow DAG API, enriches the response with UI URLs using the helper get_dag_url, and returns the result as MCP TextContent.
    async def get_dags( limit: Optional[int] = None, offset: Optional[int] = None, order_by: Optional[str] = None, tags: Optional[List[str]] = None, only_active: Optional[bool] = None, paused: Optional[bool] = None, dag_id_pattern: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build parameters dictionary kwargs: Dict[str, Any] = {} if limit is not None: kwargs["limit"] = limit if offset is not None: kwargs["offset"] = offset if order_by is not None: kwargs["order_by"] = order_by if tags is not None: kwargs["tags"] = tags if only_active is not None: kwargs["only_active"] = only_active if paused is not None: kwargs["paused"] = paused if dag_id_pattern is not None: kwargs["dag_id_pattern"] = dag_id_pattern # Use the client to fetch DAGs response = dag_api.get_dags(**kwargs) # Convert response to dictionary for easier manipulation response_dict = response.to_dict() # Add UI links to each DAG for dag in response_dict.get("dags", []): dag["ui_url"] = get_dag_url(dag["dag_id"]) return [types.TextContent(type="text", text=str(response_dict))]
  • Module-level registration function that lists all DAG-related tools, including the tuple for "fetch_dags" which maps the tool name to the get_dags handler function, description, and read-only flag. This is imported into src/main.py for top-level MCP registration.
    def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (get_dags, "fetch_dags", "Fetch all DAGs", True), (get_dag, "get_dag", "Get a DAG by ID", True), (get_dag_details, "get_dag_details", "Get a simplified representation of DAG", True), (get_dag_source, "get_dag_source", "Get a source code", True), (pause_dag, "pause_dag", "Pause a DAG by ID", False), (unpause_dag, "unpause_dag", "Unpause a DAG by ID", False), (get_dag_tasks, "get_dag_tasks", "Get tasks for DAG", True), (get_task, "get_task", "Get a task by ID", True), (get_tasks, "get_tasks", "Get tasks for DAG", True), (patch_dag, "patch_dag", "Update a DAG", False), (patch_dags, "patch_dags", "Update multiple DAGs", False), (delete_dag, "delete_dag", "Delete a DAG", False), (clear_task_instances, "clear_task_instances", "Clear a set of task instances", False), (set_task_instances_state, "set_task_instances_state", "Set a state of task instances", False), (reparse_dag_file, "reparse_dag_file", "Request re-parsing of a DAG file", False), ]
  • Helper function used by the fetch_dags handler to generate UI URLs for each DAG in the response.
    def get_dag_url(dag_id: str) -> str: return f"{AIRFLOW_HOST}/dags/{dag_id}/grid"
  • src/main.py:80-107 (registration)
    Top-level MCP tool registration in the main entrypoint. Imports get_all_functions from dag.py (among others), calls it to get tool lists based on selected APIs, optionally filters read-only, and registers each tool using fastmcp.tools.Tool.from_function into the app.
    def main(transport: str, mcp_host: str, mcp_port: int, apis: list[str], read_only: bool) -> None: from src.server import app for api in apis: logging.debug(f"Adding API: {api}") get_function = APITYPE_TO_FUNCTIONS[APIType(api)] try: functions = get_function() except NotImplementedError: continue # Filter functions for read-only mode if requested if read_only: functions = filter_functions_for_read_only(functions) for func, name, description, *_ in functions: app.add_tool(Tool.from_function(func, name=name, description=description)) logging.debug(f"Starting MCP server for Apache Airflow with {transport} transport") params_to_run = {} if transport in {"sse", "http"}: if transport == "sse": logging.warning("NOTE: the SSE transport is going to be deprecated.") params_to_run = {"port": int(mcp_port), "host": mcp_host} app.run(transport=transport, **params_to_run)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yangkyeongmo/mcp-server-apache-airflow'

If you have feedback or need assistance with the MCP directory API, please join our Discord server