fetch_dags
Retrieve and list all Directed Acyclic Graphs (DAGs) from Apache Airflow with filtering options for tags, status, and patterns to manage workflow orchestration.
Instructions
Fetch all DAGs
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| limit | No | ||
| offset | No | ||
| order_by | No | ||
| tags | No | ||
| only_active | No | ||
| paused | No | ||
| dag_id_pattern | No |
Implementation Reference
- src/airflow/dag.py:40-77 (handler)Handler function `get_dags` that implements the core logic of the `fetch_dags` tool: accepts optional query parameters, calls Airflow's DAGApi.get_dags, enhances the response with UI URLs, and returns it as MCP TextContent.async def get_dags( limit: Optional[int] = None, offset: Optional[int] = None, order_by: Optional[str] = None, tags: Optional[List[str]] = None, only_active: Optional[bool] = None, paused: Optional[bool] = None, dag_id_pattern: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build parameters dictionary kwargs: Dict[str, Any] = {} if limit is not None: kwargs["limit"] = limit if offset is not None: kwargs["offset"] = offset if order_by is not None: kwargs["order_by"] = order_by if tags is not None: kwargs["tags"] = tags if only_active is not None: kwargs["only_active"] = only_active if paused is not None: kwargs["paused"] = paused if dag_id_pattern is not None: kwargs["dag_id_pattern"] = dag_id_pattern # Use the client to fetch DAGs response = dag_api.get_dags(**kwargs) # Convert response to dictionary for easier manipulation response_dict = response.to_dict() # Add UI links to each DAG for dag in response_dict.get("dags", []): dag["ui_url"] = get_dag_url(dag["dag_id"]) return [types.TextContent(type="text", text=str(response_dict))]
- src/airflow/dag.py:15-33 (registration)Module-level `get_all_functions` that registers the `fetch_dags` tool by including its tuple (get_dags, "fetch_dags", "Fetch all DAGs", True). This is imported and used in src/main.py to add the tool to the MCP server.def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (get_dags, "fetch_dags", "Fetch all DAGs", True), (get_dag, "get_dag", "Get a DAG by ID", True), (get_dag_details, "get_dag_details", "Get a simplified representation of DAG", True), (get_dag_source, "get_dag_source", "Get a source code", True), (pause_dag, "pause_dag", "Pause a DAG by ID", False), (unpause_dag, "unpause_dag", "Unpause a DAG by ID", False), (get_dag_tasks, "get_dag_tasks", "Get tasks for DAG", True), (get_task, "get_task", "Get a task by ID", True), (get_tasks, "get_tasks", "Get tasks for DAG", True), (patch_dag, "patch_dag", "Update a DAG", False), (patch_dags, "patch_dags", "Update multiple DAGs", False), (delete_dag, "delete_dag", "Delete a DAG", False), (clear_task_instances, "clear_task_instances", "Clear a set of task instances", False), (set_task_instances_state, "set_task_instances_state", "Set a state of task instances", False), (reparse_dag_file, "reparse_dag_file", "Request re-parsing of a DAG file", False), ]
- src/main.py:90-92 (registration)Global tool registration loop in main.py that calls `app.add_tool` for each function from imported module get_all_functions, including `fetch_dags` from dag.py (imported at line 7).for func, name, description, *_ in functions: app.add_tool(func, name=name, description=description)
- src/airflow/dag.py:36-38 (helper)Helper function `get_dag_url` used by `get_dags` to add UI links to each DAG in the response.def get_dag_url(dag_id: str) -> str: return f"{AIRFLOW_HOST}/dags/{dag_id}/grid"