Skip to main content
Glama
yangkyeongmo

MCP Server for Apache Airflow

by yangkyeongmo

get_dag_runs_batch

Retrieve multiple Airflow DAG runs in batch with filters for DAG IDs, execution dates, states, and pagination to monitor workflow execution.

Instructions

List DAG runs (batch)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_idsNo
execution_date_gteNo
execution_date_lteNo
start_date_gteNo
start_date_lteNo
end_date_gteNo
end_date_lteNo
stateNo
order_byNo
page_offsetNo
page_limitNo

Implementation Reference

  • The core handler function implementing the get_dag_runs_batch tool. It constructs a filter request from parameters, calls the Airflow API's get_dag_runs_batch endpoint, enhances each DAG run with a UI URL, and returns the formatted response.
    async def get_dag_runs_batch( dag_ids: Optional[List[str]] = None, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, state: Optional[List[str]] = None, order_by: Optional[str] = None, page_offset: Optional[int] = None, page_limit: Optional[int] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build request dictionary request: Dict[str, Any] = {} if dag_ids is not None: request["dag_ids"] = dag_ids if execution_date_gte is not None: request["execution_date_gte"] = execution_date_gte if execution_date_lte is not None: request["execution_date_lte"] = execution_date_lte if start_date_gte is not None: request["start_date_gte"] = start_date_gte if start_date_lte is not None: request["start_date_lte"] = start_date_lte if end_date_gte is not None: request["end_date_gte"] = end_date_gte if end_date_lte is not None: request["end_date_lte"] = end_date_lte if state is not None: request["state"] = state if order_by is not None: request["order_by"] = order_by if page_offset is not None: request["page_offset"] = page_offset if page_limit is not None: request["page_limit"] = page_limit response = dag_run_api.get_dag_runs_batch(list_dag_runs_form=request) # Convert response to dictionary for easier manipulation response_dict = response.to_dict() # Add UI links to each DAG run for dag_run in response_dict.get("dag_runs", []): dag_run["ui_url"] = get_dag_run_url(dag_run["dag_id"], dag_run["dag_run_id"]) return [types.TextContent(type="text", text=str(response_dict))]
  • Specific registration tuple for the get_dag_runs_batch tool within the get_all_functions() return list, including the handler function, tool name, description, and read-only status.
    (get_dag_runs_batch, "get_dag_runs_batch", "List DAG runs (batch)", True),
  • Helper utility function used by the handler to generate Airflow UI URLs for each DAG run in the batch response.
    def get_dag_run_url(dag_id: str, dag_run_id: str) -> str: return f"{AIRFLOW_HOST}/dags/{dag_id}/grid?dag_run_id={dag_run_id}"
  • src/main.py:95-96 (registration)
    Top-level registration loop in main.py that processes functions from dagrun.get_all_functions (including get_dag_runs_batch) and registers them as MCP tools using fastmcp.tools.Tool.from_function.
    for func, name, description, *_ in functions: app.add_tool(Tool.from_function(func, name=name, description=description))

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yangkyeongmo/mcp-server-apache-airflow'

If you have feedback or need assistance with the MCP directory API, please join our Discord server