list_task_instances_all
Retrieve task instances across all DAGs or specific DAGs with advanced filtering by date, duration, state, pool, queue, and pagination options for Airflow cluster monitoring.
Instructions
[Tool Role]: Lists task instances across all DAGs or filtered by specific DAG with comprehensive filtering options.
IMPORTANT: When users provide natural language dates, calculate relative dates using the current server time context (internally via get_current_time_context):
"yesterday" = current_date - 1 day
"last week" = current_date - 7 days to current_date - 1 day
"last 3 days" = current_date - 3 days to current_date
"today" = current_date
Args: dag_id: Filter by DAG ID (optional) dag_run_id: Filter by DAG run ID (optional) execution_date_gte: Filter by execution date greater than or equal to (ISO 8601 format with timezone, e.g., '2024-01-01T00:00:00Z', optional) execution_date_lte: Filter by execution date less than or equal to (ISO 8601 format with timezone, e.g., '2024-01-01T23:59:59Z', optional) start_date_gte: Filter by start date greater than or equal to (ISO 8601 format with timezone, optional) start_date_lte: Filter by start date less than or equal to (ISO 8601 format with timezone, optional) end_date_gte: Filter by end date greater than or equal to (ISO 8601 format with timezone, optional) end_date_lte: Filter by end date less than or equal to (ISO 8601 format with timezone, optional) duration_gte: Filter by duration greater than or equal to (seconds, optional) duration_lte: Filter by duration less than or equal to (seconds, optional) state: Filter by task state (queued, running, success, failed, up_for_retry, up_for_reschedule, upstream_failed, skipped, deferred, scheduled, removed, restarting, optional) pool: Filter by pool name (optional) queue: Filter by queue name (optional) limit: Maximum number of task instances to return (default: 20) offset: Number of task instances to skip for pagination (default: 0)
Returns: List of task instances with comprehensive information: task_instances, total_entries, limit, offset
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_id | No | ||
| dag_run_id | No | ||
| duration_gte | No | ||
| duration_lte | No | ||
| end_date_gte | No | ||
| end_date_lte | No | ||
| execution_date_gte | No | ||
| execution_date_lte | No | ||
| limit | No | ||
| offset | No | ||
| pool | No | ||
| queue | No | ||
| start_date_gte | No | ||
| start_date_lte | No | ||
| state | No |
Implementation Reference
- The core handler function for the 'list_task_instances_all' tool. It constructs query parameters from inputs and makes a GET request to Airflow's /taskInstances endpoint.@mcp.tool() async def list_task_instances_all( dag_id: Optional[str] = None, dag_run_id: Optional[str] = None, task_id: Optional[str] = None, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, duration_gte: Optional[float] = None, duration_lte: Optional[float] = None, state: Optional[List[str]] = None, pool: Optional[List[str]] = None, limit: int = 100, offset: int = 0 ) -> Dict[str, Any]: """[Tool Role]: Lists task instances with comprehensive filtering options.""" params = {'limit': limit, 'offset': offset} # Add all filter parameters if dag_id: params['dag_id'] = dag_id if dag_run_id: params['dag_run_id'] = dag_run_id if task_id: params['task_id'] = task_id if execution_date_gte: params['execution_date_gte'] = execution_date_gte if execution_date_lte: params['execution_date_lte'] = execution_date_lte if start_date_gte: params['start_date_gte'] = start_date_gte if start_date_lte: params['start_date_lte'] = start_date_lte if end_date_gte: params['end_date_gte'] = end_date_gte if end_date_lte: params['end_date_lte'] = end_date_lte if duration_gte: params['duration_gte'] = duration_gte if duration_lte: params['duration_lte'] = duration_lte if state: params['state'] = state if pool: params['pool'] = pool query_string = "&".join([f"{k}={v}" for k, v in params.items()]) resp = await airflow_request("GET", f"/taskInstances?{query_string}") resp.raise_for_status() return resp.json()
- src/mcp_airflow_api/tools/v1_tools.py:23-23 (registration)Registration call for common tools (including list_task_instances_all) in v1 API context.common_tools.register_common_tools(mcp)
- src/mcp_airflow_api/tools/v2_tools.py:24-24 (registration)Registration call for common tools (including list_task_instances_all) in v2 API context.common_tools.register_common_tools(mcp)
- The register_common_tools function defines and registers all common tools, including the handler for list_task_instances_all using @mcp.tool() decorators.def register_common_tools(mcp): """Register all 43 common tools that work with both v1 and v2 APIs.""" if airflow_request is None: raise RuntimeError("airflow_request function must be set before registering common tools") logger.info("Registering common tools shared between v1 and v2")