get_dag_runs
Retrieve and filter DAG run information from Apache Airflow by specifying criteria such as execution date range, state, and pagination options.
Instructions
Get DAG runs by ID
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_id | Yes | ||
| end_date_gte | No | ||
| end_date_lte | No | ||
| execution_date_gte | No | ||
| execution_date_lte | No | ||
| limit | No | ||
| offset | No | ||
| order_by | No | ||
| start_date_gte | No | ||
| start_date_lte | No | ||
| state | No | ||
| updated_at_gte | No | ||
| updated_at_lte | No |
Implementation Reference
- src/airflow/dagrun.py:70-122 (handler)The main handler function implementing the get_dag_runs tool logic: fetches DAG runs for a specific DAG ID with optional filters via Airflow API, adds UI links, and returns formatted text content.async def get_dag_runs( dag_id: str, limit: Optional[int] = None, offset: Optional[int] = None, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, updated_at_gte: Optional[str] = None, updated_at_lte: Optional[str] = None, state: Optional[List[str]] = None, order_by: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build parameters dictionary kwargs: Dict[str, Any] = {} if limit is not None: kwargs["limit"] = limit if offset is not None: kwargs["offset"] = offset if execution_date_gte is not None: kwargs["execution_date_gte"] = execution_date_gte if execution_date_lte is not None: kwargs["execution_date_lte"] = execution_date_lte if start_date_gte is not None: kwargs["start_date_gte"] = start_date_gte if start_date_lte is not None: kwargs["start_date_lte"] = start_date_lte if end_date_gte is not None: kwargs["end_date_gte"] = end_date_gte if end_date_lte is not None: kwargs["end_date_lte"] = end_date_lte if updated_at_gte is not None: kwargs["updated_at_gte"] = updated_at_gte if updated_at_lte is not None: kwargs["updated_at_lte"] = updated_at_lte if state is not None: kwargs["state"] = state if order_by is not None: kwargs["order_by"] = order_by response = dag_run_api.get_dag_runs(dag_id=dag_id, **kwargs) # Convert response to dictionary for easier manipulation response_dict = response.to_dict() # Add UI links to each DAG run for dag_run in response_dict.get("dag_runs", []): dag_run["ui_url"] = get_dag_run_url(dag_id, dag_run["dag_run_id"]) return [types.TextContent(type="text", text=str(response_dict))]
- src/airflow/dagrun.py:17-29 (registration)Local registration: get_all_functions() returns the list of DAG run related tools, including the tuple for get_dag_runs, which is imported and used in main.py to register tools with the MCP server.def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (post_dag_run, "post_dag_run", "Trigger a DAG by ID", False), (get_dag_runs, "get_dag_runs", "Get DAG runs by ID", True), (get_dag_runs_batch, "get_dag_runs_batch", "List DAG runs (batch)", True), (get_dag_run, "get_dag_run", "Get a DAG run by DAG ID and DAG run ID", True), (update_dag_run_state, "update_dag_run_state", "Update a DAG run state by DAG ID and DAG run ID", False), (delete_dag_run, "delete_dag_run", "Delete a DAG run by DAG ID and DAG run ID", False), (clear_dag_run, "clear_dag_run", "Clear a DAG run", False), (set_dag_run_note, "set_dag_run_note", "Update the DagRun note", False), (get_upstream_dataset_events, "get_upstream_dataset_events", "Get dataset events for a DAG run", True), ]
- src/main.py:8-8 (registration)Global registration import: Imports the get_all_functions from dagrun module (aliased as get_dagrun_functions) to include its tools in the central tool registry.from src.airflow.dagrun import get_all_functions as get_dagrun_functions
- src/airflow/dagrun.py:32-33 (helper)Helper function to generate the Airflow UI URL for a specific DAG run, used by the get_dag_runs handler to enrich the response.def get_dag_run_url(dag_id: str, dag_run_id: str) -> str: return f"{AIRFLOW_HOST}/dags/{dag_id}/grid?dag_run_id={dag_run_id}"