list_task_instances
Find Airflow tasks that ran during specific time windows using flexible date filters across all DAGs and runs in MWAA environments.
Instructions
List task instances across DAGs with flexible time-based filtering.
This is the key tool for finding what tasks were running during a specific time window. Supports wildcards: omit dag_id or dag_run_id to query across all DAGs/runs.
Args: environment_name: Name of the MWAA environment dag_id: Filter by DAG ID (optional - omit for all DAGs) dag_run_id: Filter by DAG run ID (optional - omit for all runs) start_date_gte: Tasks that started at or after this time (ISO format) start_date_lte: Tasks that started at or before this time (ISO format) end_date_gte: Tasks that ended at or after this time (ISO format) end_date_lte: Tasks that ended at or before this time (ISO format) execution_date_gte: Filter by execution/logical date >= (ISO format) execution_date_lte: Filter by execution/logical date <= (ISO format) state: Filter by state (queued, running, success, failed, etc.) pool: Filter by pool name queue: Filter by queue name duration_gte: Filter by minimum duration in seconds duration_lte: Filter by maximum duration in seconds limit: Number of items to return (default 100) offset: Number of items to skip for pagination
Returns: Dictionary containing list of task instances with details
Example - Find tasks running between 2:30-2:40 AM: list_task_instances( environment_name="my-env", start_date_lte="2024-01-15T02:40:00Z", # Started before 2:40 end_date_gte="2024-01-15T02:30:00Z", # Ended after 2:30 (or still running) )
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| environment_name | Yes | ||
| dag_id | No | ||
| dag_run_id | No | ||
| start_date_gte | No | ||
| start_date_lte | No | ||
| end_date_gte | No | ||
| end_date_lte | No | ||
| execution_date_gte | No | ||
| execution_date_lte | No | ||
| state | No | ||
| pool | No | ||
| queue | No | ||
| duration_gte | No | ||
| duration_lte | No | ||
| limit | No | ||
| offset | No |
Implementation Reference
- awslabs/mwaa_mcp_server/tools.py:371-436 (handler)The 'list_task_instances' method implements the logic to fetch task instances from the Airflow API, supporting various filters and wildcards for DAG IDs and run IDs.
async def list_task_instances( self, environment_name: str, dag_id: Optional[str] = None, dag_run_id: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, state: Optional[List[str]] = None, pool: Optional[str] = None, queue: Optional[str] = None, duration_gte: Optional[float] = None, duration_lte: Optional[float] = None, limit: Optional[int] = 100, offset: Optional[int] = 0, ) -> Dict[str, Any]: """List task instances across DAGs with flexible filtering via Airflow API. Uses the batch task instances endpoint which supports wildcards: - dag_id='~' means all DAGs - dag_run_id='~' means all DAG runs This enables time-range queries to find all tasks running in a specific window. """ # Use wildcards if not specified dag_path = dag_id if dag_id else "~" run_path = dag_run_id if dag_run_id else "~" params: Dict[str, Any] = { "limit": limit, "offset": offset, } # Time-based filters if start_date_gte: params["start_date_gte"] = start_date_gte if start_date_lte: params["start_date_lte"] = start_date_lte if end_date_gte: params["end_date_gte"] = end_date_gte if end_date_lte: params["end_date_lte"] = end_date_lte if execution_date_gte: params["execution_date_gte"] = execution_date_gte if execution_date_lte: params["execution_date_lte"] = execution_date_lte # State and resource filters if state: params["state"] = state if pool: params["pool"] = pool if queue: params["queue"] = queue # Duration filters if duration_gte is not None: params["duration_gte"] = duration_gte if duration_lte is not None: params["duration_lte"] = duration_lte endpoint = f"/dags/{dag_path}/dagRuns/{run_path}/taskInstances" return self._invoke_airflow_api(environment_name, "GET", endpoint, params=params) - awslabs/mwaa_mcp_server/server.py:436-493 (registration)Tool registration and handler wrapper for 'list_task_instances' in the MCP server.
@mcp.tool(name="list_task_instances") async def list_task_instances( environment_name: str, dag_id: Optional[str] = None, dag_run_id: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, state: Optional[List[str]] = None, pool: Optional[str] = None, queue: Optional[str] = None, duration_gte: Optional[float] = None, duration_lte: Optional[float] = None, limit: Optional[int] = 100, offset: Optional[int] = 0, ) -> Dict[str, Any]: """List task instances across DAGs with flexible time-based filtering. This is the key tool for finding what tasks were running during a specific time window. Supports wildcards: omit dag_id or dag_run_id to query across all DAGs/runs. Args: environment_name: Name of the MWAA environment dag_id: Filter by DAG ID (optional - omit for all DAGs) dag_run_id: Filter by DAG run ID (optional - omit for all runs) start_date_gte: Tasks that started at or after this time (ISO format) start_date_lte: Tasks that started at or before this time (ISO format) end_date_gte: Tasks that ended at or after this time (ISO format) end_date_lte: Tasks that ended at or before this time (ISO format) execution_date_gte: Filter by execution/logical date >= (ISO format) execution_date_lte: Filter by execution/logical date <= (ISO format) state: Filter by state (queued, running, success, failed, etc.) pool: Filter by pool name queue: Filter by queue name duration_gte: Filter by minimum duration in seconds duration_lte: Filter by maximum duration in seconds limit: Number of items to return (default 100) offset: Number of items to skip for pagination Returns: Dictionary containing list of task instances with details Example - Find tasks running between 2:30-2:40 AM: list_task_instances( environment_name="my-env", start_date_lte="2024-01-15T02:40:00Z", # Started before 2:40 end_date_gte="2024-01-15T02:30:00Z", # Ended after 2:30 (or still running) ) """ limit_int = int(limit) if limit is not None else 100 offset_int = int(offset) if offset is not None else 0 duration_gte_float = float(duration_gte) if duration_gte is not None else None duration_lte_float = float(duration_lte) if duration_lte is not None else None return await tools.list_task_instances(