Skip to main content
Glama

list_dags

Retrieve and filter DAGs from Apache Airflow clusters with pagination support for efficient workflow management.

Instructions

[Tool Role]: Lists all DAGs registered in the Airflow cluster with pagination support.

Args: limit: Maximum number of DAGs to return (default: 20) offset: Number of DAGs to skip for pagination (default: 0) fetch_all: If True, fetches all DAGs regardless of limit/offset id_contains: Filter DAGs by ID containing this string name_contains: Filter DAGs by display name containing this string

Returns: Dict containing dags list, pagination info, and total counts

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
limitNo
offsetNo
fetch_allNo
id_containsNo
name_containsNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • The primary handler function for the 'list_dags' MCP tool. Decorated with @mcp.tool() for automatic registration. Delegates execution to the internal helper function.
    async def list_dags(limit: int = 20,
                  offset: int = 0,
                  fetch_all: bool = False,
                  id_contains: Optional[str] = None,
                  name_contains: Optional[str] = None) -> Dict[str, Any]:
        """
        [Tool Role]: Lists all DAGs registered in the Airflow cluster with pagination support.
    
        Args:
            limit: Maximum number of DAGs to return (default: 20)
            offset: Number of DAGs to skip for pagination (default: 0)
            fetch_all: If True, fetches all DAGs regardless of limit/offset
            id_contains: Filter DAGs by ID containing this string
            name_contains: Filter DAGs by display name containing this string
    
        Returns:
            Dict containing dags list, pagination info, and total counts
        """
        return await list_dags_internal(limit, offset, fetch_all, id_contains, name_contains)
  • Core helper function implementing the DAG listing logic: makes Airflow API request to /dags, processes response, applies client-side filtering, handles pagination (including fetch_all mode), and formats the output.
    async def list_dags_internal(limit: int = 20,
                          offset: int = 0,
                          fetch_all: bool = False,
                          id_contains: Optional[str] = None,
                          name_contains: Optional[str] = None) -> Dict[str, Any]:
        """
        Internal helper function to list DAGs.
        This function contains the actual implementation logic that can be reused.
        """
        # Helper: server-side filtering by ID and display name
        def _filter_dags(dag_list):
            results = dag_list
            if id_contains:
                key = id_contains.lower()
                results = [d for d in results if key in d.get("dag_id", "").lower()]
            if name_contains:
                key = name_contains.lower()
                results = [d for d in results if key in (d.get("dag_display_name") or "").lower()]
            return results
    
        # If fetch_all=True, loop through pages to collect all DAGs
        if fetch_all:
            all_dags = []
            current_offset = offset
            total_entries = None
            pages_fetched = 0
            while True:
                # recursive call without fetch_all to fetch one page
                result = await list_dags_internal(limit=limit, offset=current_offset)
                page_dags = result.get("dags", [])
                all_dags.extend(page_dags)
                pages_fetched += 1
                total_entries = result.get("total_entries", 0)
                if not result.get("has_more_pages", False) or not page_dags:
                    break
                current_offset = result.get("next_offset", current_offset + limit)
            # apply filters
            filtered = _filter_dags(all_dags)
            return {
                "dags": filtered,
                "total_entries": len(filtered),
                "pages_fetched": pages_fetched,
                "limit": limit,
                "offset": offset
            }
        # Default: paginated fetch
        params = []
        params.append(f"limit={limit}")
        if offset > 0:
            params.append(f"offset={offset}")
        
        query_string = "&".join(params) if params else ""
        endpoint = f"/dags?{query_string}" if query_string else "/dags"
        
        resp = await airflow_request("GET", endpoint)
        resp.raise_for_status()
        response_data = resp.json()
        dags = response_data.get("dags", [])
        dag_list = []
        for dag in dags:
            # Extract schedule interval info
            schedule_info = dag.get("schedule_interval")
            if isinstance(schedule_info, dict) and schedule_info.get("__type") == "CronExpression":
                schedule_display = schedule_info.get("value", "Unknown")
            elif schedule_info:
                schedule_display = str(schedule_info)
            else:
                schedule_display = None
            
            dag_info = {
                "dag_id": dag.get("dag_id"),
                "dag_display_name": dag.get("dag_display_name"),
                "description": dag.get("description"),
                "is_active": dag.get("is_active"),
                "is_paused": dag.get("is_paused"),
                "schedule_interval": schedule_display,
                "max_active_runs": dag.get("max_active_runs"),
                "max_active_tasks": dag.get("max_active_tasks"),
                "owners": dag.get("owners"),
                "tags": [t.get("name") for t in dag.get("tags", [])],
                "next_dagrun": dag.get("next_dagrun"),
                "next_dagrun_data_interval_start": dag.get("next_dagrun_data_interval_start"),
                "next_dagrun_data_interval_end": dag.get("next_dagrun_data_interval_end"),
                "last_parsed_time": dag.get("last_parsed_time"),
                "has_import_errors": dag.get("has_import_errors"),
                "has_task_concurrency_limits": dag.get("has_task_concurrency_limits"),
                "timetable_description": dag.get("timetable_description"),
                "fileloc": dag.get("fileloc"),
                "file_token": dag.get("file_token")
            }
            dag_list.append(dag_info)
        
        # Calculate pagination info and apply filters
        total_entries = response_data.get("total_entries", len(dag_list))
        has_more_pages = (offset + limit) < total_entries
        next_offset = offset + limit if has_more_pages else None
        filtered = _filter_dags(dag_list)
        returned_count = len(filtered)
        
        return {
            "dags": filtered,
            "total_entries": total_entries,
            "limit": limit,
            "offset": offset,
            "returned_count": returned_count,
            "has_more_pages": has_more_pages,
            "next_offset": next_offset,
            "pagination_info": {
                "current_page": (offset // limit) + 1 if limit > 0 else 1,
                "total_pages": ((total_entries - 1) // limit) + 1 if limit > 0 and total_entries > 0 else 1,
                "remaining_count": max(0, total_entries - (offset + returned_count))
            }
        }
  • v1 API registration: Calls register_common_tools which defines and registers the list_dags handler using @mcp.tool().
    common_tools.register_common_tools(mcp)
  • v2 API registration: Calls register_common_tools which defines and registers the list_dags handler using @mcp.tool().
    common_tools.register_common_tools(mcp)
Behavior3/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries full burden. It discloses pagination behavior and filtering capabilities, which is helpful. However, it doesn't mention performance characteristics, rate limits, authentication requirements, or error conditions that would be valuable for a list operation.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

Well-structured with clear sections for tool role, arguments, and returns. The information is front-loaded and efficiently presented. Minor deduction because the '[Tool Role]' prefix is slightly redundant with the tool name.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness4/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool's moderate complexity (5 parameters, no annotations), the description provides good coverage of inputs and behavior. The existence of an output schema means return values don't need explanation. It could be more complete by addressing sibling tool relationships and performance considerations.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters5/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

With 0% schema description coverage, the description fully compensates by clearly explaining all 5 parameters with their purposes and defaults. It adds meaningful context beyond the bare schema, including how 'fetch_all' overrides 'limit/offset' and what the filter parameters do.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the specific action ('Lists all DAGs registered in the Airflow cluster') and resource ('DAGs'), distinguishing it from siblings like 'get_dag' (single DAG) or 'failed_dags' (subset). The explicit mention of 'pagination support' adds useful scope information.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

No guidance is provided on when to use this tool versus alternatives like 'get_dags_detailed_batch', 'failed_dags', or 'running_dags'. The description mentions pagination but doesn't explain trade-offs or when to choose this over other listing tools.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server