Skip to main content
Glama
nikhil-ganage

MCP Server Airflow Token

get_dag_runs_batch

Retrieve multiple Airflow DAG runs in bulk with filters for DAG IDs, date ranges, states, and pagination to manage workflow execution data efficiently.

Instructions

List DAG runs (batch)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_idsNo
execution_date_gteNo
execution_date_lteNo
start_date_gteNo
start_date_lteNo
end_date_gteNo
end_date_lteNo
stateNo
order_byNo
page_offsetNo
page_limitNo

Implementation Reference

  • The core handler function implementing the get_dag_runs_batch tool. It constructs a request from parameters, calls the Airflow DAGRunApi.get_dag_runs_batch, enhances the response with UI links using get_dag_run_url, and returns formatted text content.
    async def get_dag_runs_batch(
        dag_ids: Optional[List[str]] = None,
        execution_date_gte: Optional[str] = None,
        execution_date_lte: Optional[str] = None,
        start_date_gte: Optional[str] = None,
        start_date_lte: Optional[str] = None,
        end_date_gte: Optional[str] = None,
        end_date_lte: Optional[str] = None,
        state: Optional[List[str]] = None,
        order_by: Optional[str] = None,
        page_offset: Optional[int] = None,
        page_limit: Optional[int] = None,
    ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
        # Build request dictionary
        request: Dict[str, Any] = {}
        if dag_ids is not None:
            request["dag_ids"] = dag_ids
        if execution_date_gte is not None:
            request["execution_date_gte"] = execution_date_gte
        if execution_date_lte is not None:
            request["execution_date_lte"] = execution_date_lte
        if start_date_gte is not None:
            request["start_date_gte"] = start_date_gte
        if start_date_lte is not None:
            request["start_date_lte"] = start_date_lte
        if end_date_gte is not None:
            request["end_date_gte"] = end_date_gte
        if end_date_lte is not None:
            request["end_date_lte"] = end_date_lte
        if state is not None:
            request["state"] = state
        if order_by is not None:
            request["order_by"] = order_by
        if page_offset is not None:
            request["page_offset"] = page_offset
        if page_limit is not None:
            request["page_limit"] = page_limit
    
        response = dag_run_api.get_dag_runs_batch(list_dag_runs_form=request)
    
        # Convert response to dictionary for easier manipulation
        response_dict = response.to_dict()
    
        # Add UI links to each DAG run
        for dag_run in response_dict.get("dag_runs", []):
            dag_run["ui_url"] = get_dag_run_url(dag_run["dag_id"], dag_run["dag_run_id"])
    
        return [types.TextContent(type="text", text=str(response_dict))]
  • Local registration function get_all_functions() that includes the tuple for registering the get_dag_runs_batch tool with name, description, and read-only flag.
    def get_all_functions() -> list[tuple[Callable, str, str, bool]]:
        """Return list of (function, name, description, is_read_only) tuples for registration."""
        return [
            (post_dag_run, "post_dag_run", "Trigger a DAG by ID", False),
            (get_dag_runs, "get_dag_runs", "Get DAG runs by ID", True),
            (get_dag_runs_batch, "get_dag_runs_batch", "List DAG runs (batch)", True),
            (get_dag_run, "get_dag_run", "Get a DAG run by DAG ID and DAG run ID", True),
            (update_dag_run_state, "update_dag_run_state", "Update a DAG run state by DAG ID and DAG run ID", False),
            (delete_dag_run, "delete_dag_run", "Delete a DAG run by DAG ID and DAG run ID", False),
            (clear_dag_run, "clear_dag_run", "Clear a DAG run", False),
            (set_dag_run_note, "set_dag_run_note", "Update the DagRun note", False),
            (get_upstream_dataset_events, "get_upstream_dataset_events", "Get dataset events for a DAG run", True),
        ]
  • src/main.py:79-92 (registration)
    Global MCP tool registration loop in main.py that imports and adds functions from dagrun.py (via get_dagrun_functions) to the MCP server app using app.add_tool, including get_dag_runs_batch.
    logging.debug(f"Adding API: {api}")
    get_function = APITYPE_TO_FUNCTIONS[APIType(api)]
    try:
        functions = get_function()
    except NotImplementedError:
        continue
    
    # Filter functions for read-only mode if requested
    if read_only:
        functions = filter_functions_for_read_only(functions)
    
    for func, name, description, *_ in functions:
        app.add_tool(func, name=name, description=description)
  • Helper utility to generate the Airflow UI URL for a specific DAG run, used within the get_dag_runs_batch handler to enrich response data.
    def get_dag_run_url(dag_id: str, dag_run_id: str) -> str:
        return f"{AIRFLOW_HOST}/dags/{dag_id}/grid?dag_run_id={dag_run_id}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nikhil-ganage/mcp-server-airflow-token'

If you have feedback or need assistance with the MCP directory API, please join our Discord server