get_dag_runs_batch
Retrieve multiple Airflow DAG runs in batch with filters for DAG IDs, execution dates, states, and pagination to monitor workflow execution.
Instructions
List DAG runs (batch)
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_ids | No | ||
| execution_date_gte | No | ||
| execution_date_lte | No | ||
| start_date_gte | No | ||
| start_date_lte | No | ||
| end_date_gte | No | ||
| end_date_lte | No | ||
| state | No | ||
| order_by | No | ||
| page_offset | No | ||
| page_limit | No |
Implementation Reference
- src/airflow/dagrun.py:124-171 (handler)The core handler function implementing the get_dag_runs_batch tool. It constructs a filter request from parameters, calls the Airflow API's get_dag_runs_batch endpoint, enhances each DAG run with a UI URL, and returns the formatted response.
async def get_dag_runs_batch( dag_ids: Optional[List[str]] = None, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, state: Optional[List[str]] = None, order_by: Optional[str] = None, page_offset: Optional[int] = None, page_limit: Optional[int] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build request dictionary request: Dict[str, Any] = {} if dag_ids is not None: request["dag_ids"] = dag_ids if execution_date_gte is not None: request["execution_date_gte"] = execution_date_gte if execution_date_lte is not None: request["execution_date_lte"] = execution_date_lte if start_date_gte is not None: request["start_date_gte"] = start_date_gte if start_date_lte is not None: request["start_date_lte"] = start_date_lte if end_date_gte is not None: request["end_date_gte"] = end_date_gte if end_date_lte is not None: request["end_date_lte"] = end_date_lte if state is not None: request["state"] = state if order_by is not None: request["order_by"] = order_by if page_offset is not None: request["page_offset"] = page_offset if page_limit is not None: request["page_limit"] = page_limit response = dag_run_api.get_dag_runs_batch(list_dag_runs_form=request) # Convert response to dictionary for easier manipulation response_dict = response.to_dict() # Add UI links to each DAG run for dag_run in response_dict.get("dag_runs", []): dag_run["ui_url"] = get_dag_run_url(dag_run["dag_id"], dag_run["dag_run_id"]) return [types.TextContent(type="text", text=str(response_dict))] - src/airflow/dagrun.py:22-22 (registration)Specific registration tuple for the get_dag_runs_batch tool within the get_all_functions() return list, including the handler function, tool name, description, and read-only status.
(get_dag_runs_batch, "get_dag_runs_batch", "List DAG runs (batch)", True), - src/airflow/dagrun.py:32-33 (helper)Helper utility function used by the handler to generate Airflow UI URLs for each DAG run in the batch response.
def get_dag_run_url(dag_id: str, dag_run_id: str) -> str: return f"{AIRFLOW_HOST}/dags/{dag_id}/grid?dag_run_id={dag_run_id}" - src/main.py:95-96 (registration)Top-level registration loop in main.py that processes functions from dagrun.get_all_functions (including get_dag_runs_batch) and registers them as MCP tools using fastmcp.tools.Tool.from_function.
for func, name, description, *_ in functions: app.add_tool(Tool.from_function(func, name=name, description=description))