Skip to main content
Glama
yangkyeongmo

MCP Server for Apache Airflow

by yangkyeongmo

fetch_dags

Retrieve Apache Airflow DAGs with filtering options for tags, status, and patterns to manage and monitor workflow automation.

Instructions

Fetch all DAGs

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
limitNo
offsetNo
order_byNo
tagsNo
only_activeNo
pausedNo
dag_id_patternNo

Implementation Reference

  • Handler function that implements the core logic of the 'fetch_dags' tool. Fetches DAGs from Airflow API with optional parameters, enhances with UI URLs, and returns as MCP TextContent.
    async def get_dags( limit: Optional[int] = None, offset: Optional[int] = None, order_by: Optional[str] = None, tags: Optional[List[str]] = None, only_active: Optional[bool] = None, paused: Optional[bool] = None, dag_id_pattern: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build parameters dictionary kwargs: Dict[str, Any] = {} if limit is not None: kwargs["limit"] = limit if offset is not None: kwargs["offset"] = offset if order_by is not None: kwargs["order_by"] = order_by if tags is not None: kwargs["tags"] = tags if only_active is not None: kwargs["only_active"] = only_active if paused is not None: kwargs["paused"] = paused if dag_id_pattern is not None: kwargs["dag_id_pattern"] = dag_id_pattern # Use the client to fetch DAGs response = dag_api.get_dags(**kwargs) # Convert response to dictionary for easier manipulation response_dict = response.to_dict() # Add UI links to each DAG for dag in response_dict.get("dags", []): dag["ui_url"] = get_dag_url(dag["dag_id"]) return [types.TextContent(type="text", text=str(response_dict))]
  • Defines the list of tool functions for DAG API, including the registration tuple for 'fetch_dags' which associates the get_dags handler with the tool name, description, and read-only flag.
    def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (get_dags, "fetch_dags", "Fetch all DAGs", True), (get_dag, "get_dag", "Get a DAG by ID", True), (get_dag_details, "get_dag_details", "Get a simplified representation of DAG", True), (get_dag_source, "get_dag_source", "Get a source code", True), (pause_dag, "pause_dag", "Pause a DAG by ID", False), (unpause_dag, "unpause_dag", "Unpause a DAG by ID", False), (get_dag_tasks, "get_dag_tasks", "Get tasks for DAG", True), (get_task, "get_task", "Get a task by ID", True), (get_tasks, "get_tasks", "Get tasks for DAG", True), (patch_dag, "patch_dag", "Update a DAG", False), (patch_dags, "patch_dags", "Update multiple DAGs", False), (delete_dag, "delete_dag", "Delete a DAG", False), (clear_task_instances, "clear_task_instances", "Clear a set of task instances", False), (set_task_instances_state, "set_task_instances_state", "Set a state of task instances", False), (reparse_dag_file, "reparse_dag_file", "Request re-parsing of a DAG file", False), ]
  • src/main.py:92-97 (registration)
    The code in main.py that iterates over the functions from get_dag_functions() (among others) and registers each tool with the MCP app using app.add_tool.
    if read_only: functions = filter_functions_for_read_only(functions) for func, name, description, *_ in functions: app.add_tool(Tool.from_function(func, name=name, description=description))
  • Helper function used by get_dags to generate UI URLs for each DAG.
    def get_dag_url(dag_id: str) -> str: return f"{AIRFLOW_HOST}/dags/{dag_id}/grid"
  • src/main.py:8-9 (registration)
    Import of get_all_functions from dag.py, aliased as get_dag_functions, which is used to load the tool registrations.
    from src.airflow.dag import get_all_functions as get_dag_functions from src.airflow.dagrun import get_all_functions as get_dagrun_functions

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yangkyeongmo/mcp-server-apache-airflow'

If you have feedback or need assistance with the MCP directory API, please join our Discord server