list_dag_runs
Retrieve and filter DAG run history from Amazon MWAA environments to monitor workflow execution status, analyze performance, and troubleshoot issues.
Instructions
List DAG runs for a specific DAG.
Args: environment_name: Name of the MWAA environment dag_id: The DAG ID limit: Number of items to return state: Filter by state (queued, running, success, failed) execution_date_gte: Filter by execution date >= (ISO format) execution_date_lte: Filter by execution date <= (ISO format)
Returns: Dictionary containing list of DAG runs
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| environment_name | Yes | ||
| dag_id | Yes | ||
| limit | No | ||
| state | No | ||
| execution_date_gte | No | ||
| execution_date_lte | No |
Implementation Reference
- awslabs/mwaa_mcp_server/tools.py:320-341 (handler)The implementation logic for list_dag_runs in the tools class, which invokes the Airflow API.
async def list_dag_runs( self, environment_name: str, dag_id: str, limit: Optional[int] = 100, state: Optional[List[str]] = None, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, ) -> Dict[str, Any]: """List DAG runs via Airflow API.""" params: Dict[str, Any] = {"limit": limit} if state: params["state"] = state if execution_date_gte: params["execution_date_gte"] = execution_date_gte if execution_date_lte: params["execution_date_lte"] = execution_date_lte return self._invoke_airflow_api( environment_name, "GET", f"/dags/{dag_id}/dagRuns", params=params ) - awslabs/mwaa_mcp_server/server.py:354-380 (registration)The MCP tool registration and wrapper function for list_dag_runs.
@mcp.tool(name="list_dag_runs") async def list_dag_runs( environment_name: str, dag_id: str, limit: Optional[int] = 100, state: Optional[List[str]] = None, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, ) -> Dict[str, Any]: """List DAG runs for a specific DAG. Args: environment_name: Name of the MWAA environment dag_id: The DAG ID limit: Number of items to return state: Filter by state (queued, running, success, failed) execution_date_gte: Filter by execution date >= (ISO format) execution_date_lte: Filter by execution date <= (ISO format) Returns: Dictionary containing list of DAG runs """ limit_int = int(limit) if limit is not None else 100 return await tools.list_dag_runs( environment_name, dag_id,