Skip to main content
Glama

list_task_instances_all

Retrieve task instances across all DAGs or specific DAGs with advanced filtering by date, duration, state, pool, queue, and pagination options for Airflow cluster monitoring.

Instructions

[Tool Role]: Lists task instances across all DAGs or filtered by specific DAG with comprehensive filtering options.

IMPORTANT: When users provide natural language dates, calculate relative dates using the current server time context (internally via get_current_time_context):

  • "yesterday" = current_date - 1 day

  • "last week" = current_date - 7 days to current_date - 1 day

  • "last 3 days" = current_date - 3 days to current_date

  • "today" = current_date

Args: dag_id: Filter by DAG ID (optional) dag_run_id: Filter by DAG run ID (optional) execution_date_gte: Filter by execution date greater than or equal to (ISO 8601 format with timezone, e.g., '2024-01-01T00:00:00Z', optional) execution_date_lte: Filter by execution date less than or equal to (ISO 8601 format with timezone, e.g., '2024-01-01T23:59:59Z', optional) start_date_gte: Filter by start date greater than or equal to (ISO 8601 format with timezone, optional) start_date_lte: Filter by start date less than or equal to (ISO 8601 format with timezone, optional) end_date_gte: Filter by end date greater than or equal to (ISO 8601 format with timezone, optional) end_date_lte: Filter by end date less than or equal to (ISO 8601 format with timezone, optional) duration_gte: Filter by duration greater than or equal to (seconds, optional) duration_lte: Filter by duration less than or equal to (seconds, optional) state: Filter by task state (queued, running, success, failed, up_for_retry, up_for_reschedule, upstream_failed, skipped, deferred, scheduled, removed, restarting, optional) pool: Filter by pool name (optional) queue: Filter by queue name (optional) limit: Maximum number of task instances to return (default: 20) offset: Number of task instances to skip for pagination (default: 0)

Returns: List of task instances with comprehensive information: task_instances, total_entries, limit, offset

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_idNo
dag_run_idNo
duration_gteNo
duration_lteNo
end_date_gteNo
end_date_lteNo
execution_date_gteNo
execution_date_lteNo
limitNo
offsetNo
poolNo
queueNo
start_date_gteNo
start_date_lteNo
stateNo

Implementation Reference

  • The core handler function for the 'list_task_instances_all' tool. It constructs query parameters from inputs and makes a GET request to Airflow's /taskInstances endpoint.
    @mcp.tool() async def list_task_instances_all( dag_id: Optional[str] = None, dag_run_id: Optional[str] = None, task_id: Optional[str] = None, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, duration_gte: Optional[float] = None, duration_lte: Optional[float] = None, state: Optional[List[str]] = None, pool: Optional[List[str]] = None, limit: int = 100, offset: int = 0 ) -> Dict[str, Any]: """[Tool Role]: Lists task instances with comprehensive filtering options.""" params = {'limit': limit, 'offset': offset} # Add all filter parameters if dag_id: params['dag_id'] = dag_id if dag_run_id: params['dag_run_id'] = dag_run_id if task_id: params['task_id'] = task_id if execution_date_gte: params['execution_date_gte'] = execution_date_gte if execution_date_lte: params['execution_date_lte'] = execution_date_lte if start_date_gte: params['start_date_gte'] = start_date_gte if start_date_lte: params['start_date_lte'] = start_date_lte if end_date_gte: params['end_date_gte'] = end_date_gte if end_date_lte: params['end_date_lte'] = end_date_lte if duration_gte: params['duration_gte'] = duration_gte if duration_lte: params['duration_lte'] = duration_lte if state: params['state'] = state if pool: params['pool'] = pool query_string = "&".join([f"{k}={v}" for k, v in params.items()]) resp = await airflow_request("GET", f"/taskInstances?{query_string}") resp.raise_for_status() return resp.json()
  • Registration call for common tools (including list_task_instances_all) in v1 API context.
    common_tools.register_common_tools(mcp)
  • Registration call for common tools (including list_task_instances_all) in v2 API context.
    common_tools.register_common_tools(mcp)
  • The register_common_tools function defines and registers all common tools, including the handler for list_task_instances_all using @mcp.tool() decorators.
    def register_common_tools(mcp): """Register all 43 common tools that work with both v1 and v2 APIs.""" if airflow_request is None: raise RuntimeError("airflow_request function must be set before registering common tools") logger.info("Registering common tools shared between v1 and v2")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server