Skip to main content
Glama
nikhil-ganage

MCP Server Airflow Token

patch_dags

Update multiple DAGs in Apache Airflow by modifying pause status, tags, or filtering by DAG ID pattern to manage workflow configurations.

Instructions

Update multiple DAGs

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_id_patternNo
is_pausedNo
tagsNo

Implementation Reference

  • The async handler function implementing the 'patch_dags' tool logic. It builds an update request for DAG properties (is_paused, tags) and calls the Airflow API to patch all matching DAGs based on dag_id_pattern.
    async def patch_dags( dag_id_pattern: Optional[str] = None, is_paused: Optional[bool] = None, tags: Optional[List[str]] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: update_request = {} update_mask = [] if is_paused is not None: update_request["is_paused"] = is_paused update_mask.append("is_paused") if tags is not None: update_request["tags"] = tags update_mask.append("tags") dag = DAG(**update_request) kwargs = {} if dag_id_pattern is not None: kwargs["dag_id_pattern"] = dag_id_pattern response = dag_api.patch_dags(dag_id_pattern=dag_id_pattern, dag=dag, update_mask=update_mask, **kwargs) return [types.TextContent(type="text", text=str(response.to_dict()))]
  • The get_all_functions() that includes the registration tuple for 'patch_dags': (patch_dags, "patch_dags", "Update multiple DAGs", False). This is imported and used in main.py to register the tool.
    def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (get_dags, "fetch_dags", "Fetch all DAGs", True), (get_dag, "get_dag", "Get a DAG by ID", True), (get_dag_details, "get_dag_details", "Get a simplified representation of DAG", True), (get_dag_source, "get_dag_source", "Get a source code", True), (pause_dag, "pause_dag", "Pause a DAG by ID", False), (unpause_dag, "unpause_dag", "Unpause a DAG by ID", False), (get_dag_tasks, "get_dag_tasks", "Get tasks for DAG", True), (get_task, "get_task", "Get a task by ID", True), (get_tasks, "get_tasks", "Get tasks for DAG", True), (patch_dag, "patch_dag", "Update a DAG", False), (patch_dags, "patch_dags", "Update multiple DAGs", False), (delete_dag, "delete_dag", "Delete a DAG", False), (clear_task_instances, "clear_task_instances", "Clear a set of task instances", False), (set_task_instances_state, "set_task_instances_state", "Set a state of task instances", False), (reparse_dag_file, "reparse_dag_file", "Request re-parsing of a DAG file", False), ]
  • src/main.py:78-98 (registration)
    The main registration loop in main.py that calls get_dag_functions() (which includes patch_dags) and registers each tool with app.add_tool(func, name, description).
    for api in apis: logging.debug(f"Adding API: {api}") get_function = APITYPE_TO_FUNCTIONS[APIType(api)] try: functions = get_function() except NotImplementedError: continue # Filter functions for read-only mode if requested if read_only: functions = filter_functions_for_read_only(functions) for func, name, description, *_ in functions: app.add_tool(func, name=name, description=description) if transport == "sse": logging.debug("Starting MCP server for Apache Airflow with SSE transport") app.run(transport="sse") else: logging.debug("Starting MCP server for Apache Airflow with stdio transport") app.run(transport="stdio")
  • src/main.py:7-7 (registration)
    Import of get_all_functions from dag.py, aliased as get_dag_functions for use in tool registration.
    from src.airflow.dag import get_all_functions as get_dag_functions

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nikhil-ganage/mcp-server-airflow-token'

If you have feedback or need assistance with the MCP directory API, please join our Discord server