Skip to main content
Glama

list_dags

Retrieve and filter DAGs from Apache Airflow clusters with pagination support for efficient workflow management.

Instructions

[Tool Role]: Lists all DAGs registered in the Airflow cluster with pagination support.

Args: limit: Maximum number of DAGs to return (default: 20) offset: Number of DAGs to skip for pagination (default: 0) fetch_all: If True, fetches all DAGs regardless of limit/offset id_contains: Filter DAGs by ID containing this string name_contains: Filter DAGs by display name containing this string

Returns: Dict containing dags list, pagination info, and total counts

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
limitNo
offsetNo
fetch_allNo
id_containsNo
name_containsNo

Implementation Reference

  • The primary handler function for the 'list_dags' MCP tool. Decorated with @mcp.tool() for automatic registration. Delegates execution to the internal helper function.
    async def list_dags(limit: int = 20, offset: int = 0, fetch_all: bool = False, id_contains: Optional[str] = None, name_contains: Optional[str] = None) -> Dict[str, Any]: """ [Tool Role]: Lists all DAGs registered in the Airflow cluster with pagination support. Args: limit: Maximum number of DAGs to return (default: 20) offset: Number of DAGs to skip for pagination (default: 0) fetch_all: If True, fetches all DAGs regardless of limit/offset id_contains: Filter DAGs by ID containing this string name_contains: Filter DAGs by display name containing this string Returns: Dict containing dags list, pagination info, and total counts """ return await list_dags_internal(limit, offset, fetch_all, id_contains, name_contains)
  • Core helper function implementing the DAG listing logic: makes Airflow API request to /dags, processes response, applies client-side filtering, handles pagination (including fetch_all mode), and formats the output.
    async def list_dags_internal(limit: int = 20, offset: int = 0, fetch_all: bool = False, id_contains: Optional[str] = None, name_contains: Optional[str] = None) -> Dict[str, Any]: """ Internal helper function to list DAGs. This function contains the actual implementation logic that can be reused. """ # Helper: server-side filtering by ID and display name def _filter_dags(dag_list): results = dag_list if id_contains: key = id_contains.lower() results = [d for d in results if key in d.get("dag_id", "").lower()] if name_contains: key = name_contains.lower() results = [d for d in results if key in (d.get("dag_display_name") or "").lower()] return results # If fetch_all=True, loop through pages to collect all DAGs if fetch_all: all_dags = [] current_offset = offset total_entries = None pages_fetched = 0 while True: # recursive call without fetch_all to fetch one page result = await list_dags_internal(limit=limit, offset=current_offset) page_dags = result.get("dags", []) all_dags.extend(page_dags) pages_fetched += 1 total_entries = result.get("total_entries", 0) if not result.get("has_more_pages", False) or not page_dags: break current_offset = result.get("next_offset", current_offset + limit) # apply filters filtered = _filter_dags(all_dags) return { "dags": filtered, "total_entries": len(filtered), "pages_fetched": pages_fetched, "limit": limit, "offset": offset } # Default: paginated fetch params = [] params.append(f"limit={limit}") if offset > 0: params.append(f"offset={offset}") query_string = "&".join(params) if params else "" endpoint = f"/dags?{query_string}" if query_string else "/dags" resp = await airflow_request("GET", endpoint) resp.raise_for_status() response_data = resp.json() dags = response_data.get("dags", []) dag_list = [] for dag in dags: # Extract schedule interval info schedule_info = dag.get("schedule_interval") if isinstance(schedule_info, dict) and schedule_info.get("__type") == "CronExpression": schedule_display = schedule_info.get("value", "Unknown") elif schedule_info: schedule_display = str(schedule_info) else: schedule_display = None dag_info = { "dag_id": dag.get("dag_id"), "dag_display_name": dag.get("dag_display_name"), "description": dag.get("description"), "is_active": dag.get("is_active"), "is_paused": dag.get("is_paused"), "schedule_interval": schedule_display, "max_active_runs": dag.get("max_active_runs"), "max_active_tasks": dag.get("max_active_tasks"), "owners": dag.get("owners"), "tags": [t.get("name") for t in dag.get("tags", [])], "next_dagrun": dag.get("next_dagrun"), "next_dagrun_data_interval_start": dag.get("next_dagrun_data_interval_start"), "next_dagrun_data_interval_end": dag.get("next_dagrun_data_interval_end"), "last_parsed_time": dag.get("last_parsed_time"), "has_import_errors": dag.get("has_import_errors"), "has_task_concurrency_limits": dag.get("has_task_concurrency_limits"), "timetable_description": dag.get("timetable_description"), "fileloc": dag.get("fileloc"), "file_token": dag.get("file_token") } dag_list.append(dag_info) # Calculate pagination info and apply filters total_entries = response_data.get("total_entries", len(dag_list)) has_more_pages = (offset + limit) < total_entries next_offset = offset + limit if has_more_pages else None filtered = _filter_dags(dag_list) returned_count = len(filtered) return { "dags": filtered, "total_entries": total_entries, "limit": limit, "offset": offset, "returned_count": returned_count, "has_more_pages": has_more_pages, "next_offset": next_offset, "pagination_info": { "current_page": (offset // limit) + 1 if limit > 0 else 1, "total_pages": ((total_entries - 1) // limit) + 1 if limit > 0 and total_entries > 0 else 1, "remaining_count": max(0, total_entries - (offset + returned_count)) } }
  • v1 API registration: Calls register_common_tools which defines and registers the list_dags handler using @mcp.tool().
    common_tools.register_common_tools(mcp)
  • v2 API registration: Calls register_common_tools which defines and registers the list_dags handler using @mcp.tool().
    common_tools.register_common_tools(mcp)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server