list_task_instances_batch
Retrieve and filter task instances in bulk from Apache Airflow using criteria like DAG IDs, task states, execution dates, and duration for streamlined batch operations.
Instructions
[Tool Role]: Lists task instances in batch with multiple filtering criteria for bulk operations.
Relative date filters (if provided) are resolved against the server's current time.
Args: dag_ids: List of DAG IDs to filter by (optional) dag_run_ids: List of DAG run IDs to filter by (optional) task_ids: List of task IDs to filter by (optional) execution_date_gte: Filter by execution date greater than or equal to (ISO format, optional) execution_date_lte: Filter by execution date less than or equal to (ISO format, optional) start_date_gte: Filter by start date greater than or equal to (ISO format, optional) start_date_lte: Filter by start date less than or equal to (ISO format, optional) end_date_gte: Filter by end date greater than or equal to (ISO format, optional) end_date_lte: Filter by end date less than or equal to (ISO format, optional) duration_gte: Filter by duration greater than or equal to (seconds, optional) duration_lte: Filter by duration less than or equal to (seconds, optional) state: List of task states to filter by (optional) pool: List of pool names to filter by (optional) queue: List of queue names to filter by (optional)
Returns: Batch list of task instances with filtering results: task_instances, total_entries, applied_filters
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_ids | No | ||
| dag_run_ids | No | ||
| duration_gte | No | ||
| duration_lte | No | ||
| end_date_gte | No | ||
| end_date_lte | No | ||
| execution_date_gte | No | ||
| execution_date_lte | No | ||
| pool | No | ||
| queue | No | ||
| start_date_gte | No | ||
| start_date_lte | No | ||
| state | No | ||
| task_ids | No |
Implementation Reference
- The handler function for the list_task_instances_batch tool. It queries the Airflow API for task instances with pagination, date range, and state filters, computes a state summary, and returns the enriched data.@mcp.tool() async def list_task_instances_batch( limit: int = 100, offset: int = 0, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, state: Optional[List[str]] = None ) -> Dict[str, Any]: """[Tool Role]: Lists task instances in batch with date and state filtering.""" params = {'limit': limit, 'offset': offset} if start_date_gte: params['start_date_gte'] = start_date_gte if start_date_lte: params['start_date_lte'] = start_date_lte if state: params['state'] = state query_string = "&".join([f"{k}={v}" for k, v in params.items()]) resp = await airflow_request("GET", f"/taskInstances?{query_string}") resp.raise_for_status() data = resp.json() # Add summary statistics task_instances = data.get("task_instances", []) state_summary = {} for task in task_instances: task_state = task.get("state", "unknown") state_summary[task_state] = state_summary.get(task_state, 0) + 1 data["state_summary"] = state_summary return data
- src/mcp_airflow_api/tools/v1_tools.py:19-23 (registration)Registration for Airflow API v1: sets the v1-specific airflow_request function and calls register_common_tools(mcp), which registers the list_task_instances_batch tool.# Set the global request function to v1 common_tools.airflow_request = airflow_request_v1 # Register all 56 common tools (includes management tools) common_tools.register_common_tools(mcp)
- src/mcp_airflow_api/tools/v2_tools.py:20-24 (registration)Registration for Airflow API v2: sets the v2-specific airflow_request function and calls register_common_tools(mcp), which registers the list_task_instances_batch tool.# Set the global request function to v2 common_tools.airflow_request = airflow_request_v2 # Register all 43 common tools common_tools.register_common_tools(mcp)
- src/mcp_airflow_api/tools/common_tools.py:21-27 (registration)The register_common_tools function that defines and registers all common tools, including list_task_instances_batch via @mcp.tool() decorator.def register_common_tools(mcp): """Register all 43 common tools that work with both v1 and v2 APIs.""" if airflow_request is None: raise RuntimeError("airflow_request function must be set before registering common tools") logger.info("Registering common tools shared between v1 and v2")