get_dag_runs_batch
Retrieve multiple DAG runs from Apache Airflow with filtering by DAG IDs, date ranges, and states to monitor workflow execution status in batch operations.
Instructions
List DAG runs (batch)
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_ids | No | ||
| end_date_gte | No | ||
| end_date_lte | No | ||
| execution_date_gte | No | ||
| execution_date_lte | No | ||
| order_by | No | ||
| page_limit | No | ||
| page_offset | No | ||
| start_date_gte | No | ||
| start_date_lte | No | ||
| state | No |
Input Schema (JSON Schema)
{
"properties": {
"dag_ids": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"title": "Dag Ids"
},
"end_date_gte": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "End Date Gte"
},
"end_date_lte": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "End Date Lte"
},
"execution_date_gte": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Execution Date Gte"
},
"execution_date_lte": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Execution Date Lte"
},
"order_by": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Order By"
},
"page_limit": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Page Limit"
},
"page_offset": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Page Offset"
},
"start_date_gte": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Start Date Gte"
},
"start_date_lte": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Start Date Lte"
},
"state": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"title": "State"
}
},
"type": "object"
}
Implementation Reference
- src/airflow/dagrun.py:124-171 (handler)The main asynchronous handler function implementing the 'get_dag_runs_batch' tool. It constructs a request from input parameters, calls the Airflow DAGRunApi.get_dag_runs_batch, adds UI links to the response, and returns formatted text content.async def get_dag_runs_batch( dag_ids: Optional[List[str]] = None, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, state: Optional[List[str]] = None, order_by: Optional[str] = None, page_offset: Optional[int] = None, page_limit: Optional[int] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build request dictionary request: Dict[str, Any] = {} if dag_ids is not None: request["dag_ids"] = dag_ids if execution_date_gte is not None: request["execution_date_gte"] = execution_date_gte if execution_date_lte is not None: request["execution_date_lte"] = execution_date_lte if start_date_gte is not None: request["start_date_gte"] = start_date_gte if start_date_lte is not None: request["start_date_lte"] = start_date_lte if end_date_gte is not None: request["end_date_gte"] = end_date_gte if end_date_lte is not None: request["end_date_lte"] = end_date_lte if state is not None: request["state"] = state if order_by is not None: request["order_by"] = order_by if page_offset is not None: request["page_offset"] = page_offset if page_limit is not None: request["page_limit"] = page_limit response = dag_run_api.get_dag_runs_batch(list_dag_runs_form=request) # Convert response to dictionary for easier manipulation response_dict = response.to_dict() # Add UI links to each DAG run for dag_run in response_dict.get("dag_runs", []): dag_run["ui_url"] = get_dag_run_url(dag_run["dag_id"], dag_run["dag_run_id"]) return [types.TextContent(type="text", text=str(response_dict))]
- src/airflow/dagrun.py:17-29 (registration)The registration function get_all_functions that includes the tuple for 'get_dag_runs_batch' tool, specifying its handler, name, description, and read-only status.def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (post_dag_run, "post_dag_run", "Trigger a DAG by ID", False), (get_dag_runs, "get_dag_runs", "Get DAG runs by ID", True), (get_dag_runs_batch, "get_dag_runs_batch", "List DAG runs (batch)", True), (get_dag_run, "get_dag_run", "Get a DAG run by DAG ID and DAG run ID", True), (update_dag_run_state, "update_dag_run_state", "Update a DAG run state by DAG ID and DAG run ID", False), (delete_dag_run, "delete_dag_run", "Delete a DAG run by DAG ID and DAG run ID", False), (clear_dag_run, "clear_dag_run", "Clear a DAG run", False), (set_dag_run_note, "set_dag_run_note", "Update the DagRun note", False), (get_upstream_dataset_events, "get_upstream_dataset_events", "Get dataset events for a DAG run", True), ]
- src/airflow/dagrun.py:32-33 (helper)Helper function to generate UI URL for a DAG run, used in the get_dag_runs_batch handler to enrich the response.def get_dag_run_url(dag_id: str, dag_run_id: str) -> str: return f"{AIRFLOW_HOST}/dags/{dag_id}/grid?dag_run_id={dag_run_id}"