Skip to main content
Glama

list_task_instances

Find Airflow tasks that ran during specific time windows using flexible date filters across all DAGs and runs in MWAA environments.

Instructions

List task instances across DAGs with flexible time-based filtering.

This is the key tool for finding what tasks were running during a specific time window. Supports wildcards: omit dag_id or dag_run_id to query across all DAGs/runs.

Args: environment_name: Name of the MWAA environment dag_id: Filter by DAG ID (optional - omit for all DAGs) dag_run_id: Filter by DAG run ID (optional - omit for all runs) start_date_gte: Tasks that started at or after this time (ISO format) start_date_lte: Tasks that started at or before this time (ISO format) end_date_gte: Tasks that ended at or after this time (ISO format) end_date_lte: Tasks that ended at or before this time (ISO format) execution_date_gte: Filter by execution/logical date >= (ISO format) execution_date_lte: Filter by execution/logical date <= (ISO format) state: Filter by state (queued, running, success, failed, etc.) pool: Filter by pool name queue: Filter by queue name duration_gte: Filter by minimum duration in seconds duration_lte: Filter by maximum duration in seconds limit: Number of items to return (default 100) offset: Number of items to skip for pagination

Returns: Dictionary containing list of task instances with details

Example - Find tasks running between 2:30-2:40 AM: list_task_instances( environment_name="my-env", start_date_lte="2024-01-15T02:40:00Z", # Started before 2:40 end_date_gte="2024-01-15T02:30:00Z", # Ended after 2:30 (or still running) )

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
environment_nameYes
dag_idNo
dag_run_idNo
start_date_gteNo
start_date_lteNo
end_date_gteNo
end_date_lteNo
execution_date_gteNo
execution_date_lteNo
stateNo
poolNo
queueNo
duration_gteNo
duration_lteNo
limitNo
offsetNo

Implementation Reference

  • The 'list_task_instances' method implements the logic to fetch task instances from the Airflow API, supporting various filters and wildcards for DAG IDs and run IDs.
    async def list_task_instances(
        self,
        environment_name: str,
        dag_id: Optional[str] = None,
        dag_run_id: Optional[str] = None,
        start_date_gte: Optional[str] = None,
        start_date_lte: Optional[str] = None,
        end_date_gte: Optional[str] = None,
        end_date_lte: Optional[str] = None,
        execution_date_gte: Optional[str] = None,
        execution_date_lte: Optional[str] = None,
        state: Optional[List[str]] = None,
        pool: Optional[str] = None,
        queue: Optional[str] = None,
        duration_gte: Optional[float] = None,
        duration_lte: Optional[float] = None,
        limit: Optional[int] = 100,
        offset: Optional[int] = 0,
    ) -> Dict[str, Any]:
        """List task instances across DAGs with flexible filtering via Airflow API.
        
        Uses the batch task instances endpoint which supports wildcards:
        - dag_id='~' means all DAGs
        - dag_run_id='~' means all DAG runs
        
        This enables time-range queries to find all tasks running in a specific window.
        """
        # Use wildcards if not specified
        dag_path = dag_id if dag_id else "~"
        run_path = dag_run_id if dag_run_id else "~"
        
        params: Dict[str, Any] = {
            "limit": limit,
            "offset": offset,
        }
        
        # Time-based filters
        if start_date_gte:
            params["start_date_gte"] = start_date_gte
        if start_date_lte:
            params["start_date_lte"] = start_date_lte
        if end_date_gte:
            params["end_date_gte"] = end_date_gte
        if end_date_lte:
            params["end_date_lte"] = end_date_lte
        if execution_date_gte:
            params["execution_date_gte"] = execution_date_gte
        if execution_date_lte:
            params["execution_date_lte"] = execution_date_lte
            
        # State and resource filters
        if state:
            params["state"] = state
        if pool:
            params["pool"] = pool
        if queue:
            params["queue"] = queue
            
        # Duration filters
        if duration_gte is not None:
            params["duration_gte"] = duration_gte
        if duration_lte is not None:
            params["duration_lte"] = duration_lte
        
        endpoint = f"/dags/{dag_path}/dagRuns/{run_path}/taskInstances"
        return self._invoke_airflow_api(environment_name, "GET", endpoint, params=params)
  • Tool registration and handler wrapper for 'list_task_instances' in the MCP server.
    @mcp.tool(name="list_task_instances")
    async def list_task_instances(
        environment_name: str,
        dag_id: Optional[str] = None,
        dag_run_id: Optional[str] = None,
        start_date_gte: Optional[str] = None,
        start_date_lte: Optional[str] = None,
        end_date_gte: Optional[str] = None,
        end_date_lte: Optional[str] = None,
        execution_date_gte: Optional[str] = None,
        execution_date_lte: Optional[str] = None,
        state: Optional[List[str]] = None,
        pool: Optional[str] = None,
        queue: Optional[str] = None,
        duration_gte: Optional[float] = None,
        duration_lte: Optional[float] = None,
        limit: Optional[int] = 100,
        offset: Optional[int] = 0,
    ) -> Dict[str, Any]:
        """List task instances across DAGs with flexible time-based filtering.
    
        This is the key tool for finding what tasks were running during a specific time window.
        Supports wildcards: omit dag_id or dag_run_id to query across all DAGs/runs.
    
        Args:
            environment_name: Name of the MWAA environment
            dag_id: Filter by DAG ID (optional - omit for all DAGs)
            dag_run_id: Filter by DAG run ID (optional - omit for all runs)
            start_date_gte: Tasks that started at or after this time (ISO format)
            start_date_lte: Tasks that started at or before this time (ISO format)
            end_date_gte: Tasks that ended at or after this time (ISO format)
            end_date_lte: Tasks that ended at or before this time (ISO format)
            execution_date_gte: Filter by execution/logical date >= (ISO format)
            execution_date_lte: Filter by execution/logical date <= (ISO format)
            state: Filter by state (queued, running, success, failed, etc.)
            pool: Filter by pool name
            queue: Filter by queue name
            duration_gte: Filter by minimum duration in seconds
            duration_lte: Filter by maximum duration in seconds
            limit: Number of items to return (default 100)
            offset: Number of items to skip for pagination
    
        Returns:
            Dictionary containing list of task instances with details
    
        Example - Find tasks running between 2:30-2:40 AM:
            list_task_instances(
                environment_name="my-env",
                start_date_lte="2024-01-15T02:40:00Z",  # Started before 2:40
                end_date_gte="2024-01-15T02:30:00Z",   # Ended after 2:30 (or still running)
            )
        """
        limit_int = int(limit) if limit is not None else 100
        offset_int = int(offset) if offset is not None else 0
        duration_gte_float = float(duration_gte) if duration_gte is not None else None
        duration_lte_float = float(duration_lte) if duration_lte is not None else None
    
        return await tools.list_task_instances(

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/paschmaria/mwaa-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server