Skip to main content
Glama

list_dags

Retrieve and filter DAGs from Apache Airflow clusters with pagination support for efficient workflow management.

Instructions

[Tool Role]: Lists all DAGs registered in the Airflow cluster with pagination support.

Args: limit: Maximum number of DAGs to return (default: 20) offset: Number of DAGs to skip for pagination (default: 0) fetch_all: If True, fetches all DAGs regardless of limit/offset id_contains: Filter DAGs by ID containing this string name_contains: Filter DAGs by display name containing this string

Returns: Dict containing dags list, pagination info, and total counts

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
limitNo
offsetNo
fetch_allNo
id_containsNo
name_containsNo

Implementation Reference

  • The primary handler function for the 'list_dags' MCP tool. Decorated with @mcp.tool() for automatic registration. Delegates execution to the internal helper function.
    async def list_dags(limit: int = 20,
                  offset: int = 0,
                  fetch_all: bool = False,
                  id_contains: Optional[str] = None,
                  name_contains: Optional[str] = None) -> Dict[str, Any]:
        """
        [Tool Role]: Lists all DAGs registered in the Airflow cluster with pagination support.
    
        Args:
            limit: Maximum number of DAGs to return (default: 20)
            offset: Number of DAGs to skip for pagination (default: 0)
            fetch_all: If True, fetches all DAGs regardless of limit/offset
            id_contains: Filter DAGs by ID containing this string
            name_contains: Filter DAGs by display name containing this string
    
        Returns:
            Dict containing dags list, pagination info, and total counts
        """
        return await list_dags_internal(limit, offset, fetch_all, id_contains, name_contains)
  • Core helper function implementing the DAG listing logic: makes Airflow API request to /dags, processes response, applies client-side filtering, handles pagination (including fetch_all mode), and formats the output.
    async def list_dags_internal(limit: int = 20,
                          offset: int = 0,
                          fetch_all: bool = False,
                          id_contains: Optional[str] = None,
                          name_contains: Optional[str] = None) -> Dict[str, Any]:
        """
        Internal helper function to list DAGs.
        This function contains the actual implementation logic that can be reused.
        """
        # Helper: server-side filtering by ID and display name
        def _filter_dags(dag_list):
            results = dag_list
            if id_contains:
                key = id_contains.lower()
                results = [d for d in results if key in d.get("dag_id", "").lower()]
            if name_contains:
                key = name_contains.lower()
                results = [d for d in results if key in (d.get("dag_display_name") or "").lower()]
            return results
    
        # If fetch_all=True, loop through pages to collect all DAGs
        if fetch_all:
            all_dags = []
            current_offset = offset
            total_entries = None
            pages_fetched = 0
            while True:
                # recursive call without fetch_all to fetch one page
                result = await list_dags_internal(limit=limit, offset=current_offset)
                page_dags = result.get("dags", [])
                all_dags.extend(page_dags)
                pages_fetched += 1
                total_entries = result.get("total_entries", 0)
                if not result.get("has_more_pages", False) or not page_dags:
                    break
                current_offset = result.get("next_offset", current_offset + limit)
            # apply filters
            filtered = _filter_dags(all_dags)
            return {
                "dags": filtered,
                "total_entries": len(filtered),
                "pages_fetched": pages_fetched,
                "limit": limit,
                "offset": offset
            }
        # Default: paginated fetch
        params = []
        params.append(f"limit={limit}")
        if offset > 0:
            params.append(f"offset={offset}")
        
        query_string = "&".join(params) if params else ""
        endpoint = f"/dags?{query_string}" if query_string else "/dags"
        
        resp = await airflow_request("GET", endpoint)
        resp.raise_for_status()
        response_data = resp.json()
        dags = response_data.get("dags", [])
        dag_list = []
        for dag in dags:
            # Extract schedule interval info
            schedule_info = dag.get("schedule_interval")
            if isinstance(schedule_info, dict) and schedule_info.get("__type") == "CronExpression":
                schedule_display = schedule_info.get("value", "Unknown")
            elif schedule_info:
                schedule_display = str(schedule_info)
            else:
                schedule_display = None
            
            dag_info = {
                "dag_id": dag.get("dag_id"),
                "dag_display_name": dag.get("dag_display_name"),
                "description": dag.get("description"),
                "is_active": dag.get("is_active"),
                "is_paused": dag.get("is_paused"),
                "schedule_interval": schedule_display,
                "max_active_runs": dag.get("max_active_runs"),
                "max_active_tasks": dag.get("max_active_tasks"),
                "owners": dag.get("owners"),
                "tags": [t.get("name") for t in dag.get("tags", [])],
                "next_dagrun": dag.get("next_dagrun"),
                "next_dagrun_data_interval_start": dag.get("next_dagrun_data_interval_start"),
                "next_dagrun_data_interval_end": dag.get("next_dagrun_data_interval_end"),
                "last_parsed_time": dag.get("last_parsed_time"),
                "has_import_errors": dag.get("has_import_errors"),
                "has_task_concurrency_limits": dag.get("has_task_concurrency_limits"),
                "timetable_description": dag.get("timetable_description"),
                "fileloc": dag.get("fileloc"),
                "file_token": dag.get("file_token")
            }
            dag_list.append(dag_info)
        
        # Calculate pagination info and apply filters
        total_entries = response_data.get("total_entries", len(dag_list))
        has_more_pages = (offset + limit) < total_entries
        next_offset = offset + limit if has_more_pages else None
        filtered = _filter_dags(dag_list)
        returned_count = len(filtered)
        
        return {
            "dags": filtered,
            "total_entries": total_entries,
            "limit": limit,
            "offset": offset,
            "returned_count": returned_count,
            "has_more_pages": has_more_pages,
            "next_offset": next_offset,
            "pagination_info": {
                "current_page": (offset // limit) + 1 if limit > 0 else 1,
                "total_pages": ((total_entries - 1) // limit) + 1 if limit > 0 and total_entries > 0 else 1,
                "remaining_count": max(0, total_entries - (offset + returned_count))
            }
        }
  • v1 API registration: Calls register_common_tools which defines and registers the list_dags handler using @mcp.tool().
    common_tools.register_common_tools(mcp)
  • v2 API registration: Calls register_common_tools which defines and registers the list_dags handler using @mcp.tool().
    common_tools.register_common_tools(mcp)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server