Skip to main content
Glama

list_task_instances_all

Retrieve task instances from Apache Airflow with filtering by DAG, run, task, date ranges, duration, state, and pool parameters to monitor workflow execution.

Instructions

[Tool Role]: Lists task instances with comprehensive filtering options.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_idNo
dag_run_idNo
task_idNo
execution_date_gteNo
execution_date_lteNo
start_date_gteNo
start_date_lteNo
end_date_gteNo
end_date_lteNo
duration_gteNo
duration_lteNo
stateNo
poolNo
limitNo
offsetNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • Core handler function for the 'list_task_instances_all' tool. Queries the Airflow /taskInstances API endpoint with dynamic filtering parameters for comprehensive task instance listing.
    @mcp.tool()
    async def list_task_instances_all(
        dag_id: Optional[str] = None,
        dag_run_id: Optional[str] = None,
        task_id: Optional[str] = None,
        execution_date_gte: Optional[str] = None,
        execution_date_lte: Optional[str] = None,
        start_date_gte: Optional[str] = None,
        start_date_lte: Optional[str] = None,
        end_date_gte: Optional[str] = None,
        end_date_lte: Optional[str] = None,
        duration_gte: Optional[float] = None,
        duration_lte: Optional[float] = None,
        state: Optional[List[str]] = None,
        pool: Optional[List[str]] = None,
        limit: int = 100,
        offset: int = 0
    ) -> Dict[str, Any]:
        """[Tool Role]: Lists task instances with comprehensive filtering options."""
        params = {'limit': limit, 'offset': offset}
        
        # Add all filter parameters
        if dag_id:
            params['dag_id'] = dag_id
        if dag_run_id:
            params['dag_run_id'] = dag_run_id
        if task_id:
            params['task_id'] = task_id
        if execution_date_gte:
            params['execution_date_gte'] = execution_date_gte
        if execution_date_lte:
            params['execution_date_lte'] = execution_date_lte
        if start_date_gte:
            params['start_date_gte'] = start_date_gte
        if start_date_lte:
            params['start_date_lte'] = start_date_lte
        if end_date_gte:
            params['end_date_gte'] = end_date_gte
        if end_date_lte:
            params['end_date_lte'] = end_date_lte
        if duration_gte:
            params['duration_gte'] = duration_gte
        if duration_lte:
            params['duration_lte'] = duration_lte
        if state:
            params['state'] = state
        if pool:
            params['pool'] = pool
        
        query_string = "&".join([f"{k}={v}" for k, v in params.items()])
        resp = await airflow_request("GET", f"/taskInstances?{query_string}")
        resp.raise_for_status()
        return resp.json()
  • Registers the common tools (including list_task_instances_all) for Airflow API v1 by setting the v1-specific airflow_request function and calling register_common_tools.
    # Set the global request function to v1
    common_tools.airflow_request = airflow_request_v1
    
    # Register all 56 common tools (includes management tools)
    common_tools.register_common_tools(mcp)
    
    # V1 has no exclusive tools - all tools are shared with v2
    
    logger.info("Registered all Airflow API v1 tools (56 tools: 43 core + 13 management tools)")
  • Registers the common tools (including list_task_instances_all) for Airflow API v2 by setting the v2-specific airflow_request function and calling register_common_tools.
    # Set the global request function to v2
    common_tools.airflow_request = airflow_request_v2
    
    # Register all 43 common tools
    common_tools.register_common_tools(mcp)
  • The register_common_tools function defines and registers all common MCP tools using @mcp.tool() decorators, including the list_task_instances_all handler.
    def register_common_tools(mcp):
        """Register all 43 common tools that work with both v1 and v2 APIs."""
        
        if airflow_request is None:
            raise RuntimeError("airflow_request function must be set before registering common tools")
        
        logger.info("Registering common tools shared between v1 and v2")
    
        # Template & Prompt Management
        @mcp.tool()
        async def get_prompt_template(section: Optional[str] = None, mode: Optional[str] = None) -> str:
            """
            [Tool Role]: Provides comprehensive prompt template for LLM interactions with Airflow operations.
    
            Args:
                section: Optional section name to get specific part of template
                mode: Optional mode (summary/detailed) to control response verbosity
    
            Returns:
                Comprehensive template or specific section for optimal LLM guidance
            """
            template = read_prompt_template(PROMPT_TEMPLATE_PATH)
            
            if section:
                sections = parse_prompt_sections(template)
                for i, s in enumerate(sections):
                    if section.lower() in s.lower():
                        return sections[i + 1]  # +1 to skip the title section
                return f"Section '{section}' not found."
        
            return template
    
        # DAG Management (11 tools)
        @mcp.tool()
        async def list_dags(limit: int = 20,
                      offset: int = 0,
                      fetch_all: bool = False,
                      id_contains: Optional[str] = None,
                      name_contains: Optional[str] = None) -> Dict[str, Any]:
            """
            [Tool Role]: Lists all DAGs registered in the Airflow cluster with pagination support.
        
            Args:
                limit: Maximum number of DAGs to return (default: 20)
                offset: Number of DAGs to skip for pagination (default: 0)
                fetch_all: If True, fetches all DAGs regardless of limit/offset
                id_contains: Filter DAGs by ID containing this string
                name_contains: Filter DAGs by display name containing this string
    
            Returns:
                Dict containing dags list, pagination info, and total counts
            """
            return await list_dags_internal(limit, offset, fetch_all, id_contains, name_contains)
    
        @mcp.tool()
        async def get_dag(dag_id: str) -> Dict[str, Any]:
            """
            [Tool Role]: Retrieves detailed information for a specific DAG.
    
            Args:
                dag_id: The DAG ID to get details for
    
            Returns:
                Comprehensive DAG details
            """
            return await get_dag_detailed_info(dag_id)
    
        @mcp.tool()
        async def get_dags_detailed_batch(
            limit: int = 100,
            offset: int = 0,
            fetch_all: bool = False,
            id_contains: Optional[str] = None,
            name_contains: Optional[str] = None,
            is_active: Optional[bool] = None,
            is_paused: Optional[bool] = None
        ) -> Dict[str, Any]:
            """
            [Tool Role]: Retrieves detailed information for multiple DAGs in batch with latest run information.
            """
            dag_list_result = await list_dags_internal(
                limit=limit, 
                offset=offset, 
                fetch_all=fetch_all,
                id_contains=id_contains,
                name_contains=name_contains
            )
        
            dags_basic = dag_list_result.get("dags", [])
            detailed_dags = []
            success_count = 0
            error_count = 0
            errors = []
            skipped_count = 0
        
            for dag_basic in dags_basic:
                dag_id = dag_basic.get("dag_id")
                if not dag_id:
                    skipped_count += 1
                    continue
                
                if is_active is not None and dag_basic.get("is_active") != is_active:
                    skipped_count += 1
                    continue
                if is_paused is not None and dag_basic.get("is_paused") != is_paused:
                    skipped_count += 1
                    continue
                
                try:
                    detailed_dag = await get_dag_detailed_info(dag_id)
                    
                    try:
                        latest_run_resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit=1&order_by=-execution_date")
                        latest_run_resp.raise_for_status()
                        latest_runs = latest_run_resp.json().get("dag_runs", [])
                    
                        if latest_runs:
                            latest_run = latest_runs[0]
                            detailed_dag["latest_dag_run"] = {
                                "run_id": latest_run.get("run_id"),
                                "run_type": latest_run.get("run_type"),
                                "state": latest_run.get("state"),
                                "execution_date": latest_run.get("execution_date"),
                                "start_date": latest_run.get("start_date"),
                                "end_date": latest_run.get("end_date"),
                                "data_interval_start": latest_run.get("data_interval_start"),
                                "data_interval_end": latest_run.get("data_interval_end"),
                                "external_trigger": latest_run.get("external_trigger"),
                                "conf": latest_run.get("conf"),
                                "note": latest_run.get("note")
                            }
                        else:
                            detailed_dag["latest_dag_run"] = None
                    except Exception:
                        detailed_dag["latest_dag_run"] = None
                    
                    detailed_dags.append(detailed_dag)
                    success_count += 1
                except Exception as e:
                    error_count += 1
                    errors.append({
                        "dag_id": dag_id,
                        "error": str(e)
                    })
        
            return {
                "dags_detailed": detailed_dags,
                "total_processed": success_count,
                "total_available": dag_list_result.get("total_entries", 0),
                "returned_count": len(detailed_dags),
                "processing_stats": {
                    "success_count": success_count,
                    "error_count": error_count,
                    "skipped_count": skipped_count,
                    "errors": errors
                },
                "applied_filters": {
                    "id_contains": id_contains,
                    "name_contains": name_contains,
                    "is_active": is_active,
                    "is_paused": is_paused,
                    "limit": limit,
                    "offset": offset,
                    "fetch_all": fetch_all
                },
                "pagination_info": dag_list_result.get("pagination_info", {}),
                "has_more_pages": dag_list_result.get("has_more_pages", False),
                "next_offset": dag_list_result.get("next_offset")
            }
    
        @mcp.tool()
        async def running_dags() -> Dict[str, Any]:
            """[Tool Role]: Lists all currently running DAG runs in the Airflow cluster."""
            resp = await airflow_request("GET", "/dags/~/dagRuns?state=running&limit=1000&order_by=-start_date")
            resp.raise_for_status()
            data = resp.json()
        
            running_runs = []
            for run in data.get("dag_runs", []):
                run_info = {
                    "dag_id": run.get("dag_id"),
                    "dag_display_name": run.get("dag_display_name"),
                    "run_id": run.get("run_id"),
                    "run_type": run.get("run_type"),
                    "state": run.get("state"),
                    "execution_date": run.get("execution_date"),
                    "start_date": run.get("start_date"),
                    "end_date": run.get("end_date"),
                    "data_interval_start": run.get("data_interval_start"),
                    "data_interval_end": run.get("data_interval_end"),
                    "external_trigger": run.get("external_trigger"),
                    "conf": run.get("conf"),
                    "note": run.get("note")
                }
                running_runs.append(run_info)
        
            return {
                "dag_runs": running_runs,
                "total_running": len(running_runs),
                "query_info": {
                    "state_filter": "running",
                    "limit": 1000,
                    "order_by": "start_date (descending)"
                }
            }
    
        @mcp.tool()
        async def failed_dags() -> Dict[str, Any]:
            """[Tool Role]: Lists all recently failed DAG runs in the Airflow cluster."""
            resp = await airflow_request("GET", "/dags/~/dagRuns?state=failed&limit=1000&order_by=-start_date")
            resp.raise_for_status()
            data = resp.json()
        
            failed_runs = []
            for run in data.get("dag_runs", []):
                run_info = {
                    "dag_id": run.get("dag_id"),
                    "dag_display_name": run.get("dag_display_name"),
                    "run_id": run.get("run_id"),
                    "run_type": run.get("run_type"),
                    "state": run.get("state"),
                    "execution_date": run.get("execution_date"),
                    "start_date": run.get("start_date"),
                    "end_date": run.get("end_date"),
                    "data_interval_start": run.get("data_interval_start"),
                    "data_interval_end": run.get("data_interval_end"),
                    "external_trigger": run.get("external_trigger"),
                    "conf": run.get("conf"),
                    "note": run.get("note")
                }
                failed_runs.append(run_info)
        
            return {
                "dag_runs": failed_runs,
                "total_failed": len(failed_runs),
                "query_info": {
                    "state_filter": "failed",
                    "limit": 1000,
                    "order_by": "start_date (descending)"
                }
            }
    
        @mcp.tool()
        async def trigger_dag(dag_id: str) -> Dict[str, Any]:
            """[Tool Role]: Triggers a new DAG run for a specified Airflow DAG."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            resp = await airflow_request("POST", f"/dags/{dag_id}/dagRuns", json={"conf": {}})
            resp.raise_for_status()
            run = resp.json()
            return {
                "dag_id": dag_id,
                "run_id": run.get("run_id"),
                "state": run.get("state"),
                "execution_date": run.get("execution_date"),
                "start_date": run.get("start_date"),
                "end_date": run.get("end_date")
            }
    
        @mcp.tool()
        async def pause_dag(dag_id: str) -> Dict[str, Any]:
            """[Tool Role]: Pauses the specified Airflow DAG (prevents scheduling new runs)."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            resp = await airflow_request("PATCH", f"/dags/{dag_id}", json={"is_paused": True})
            resp.raise_for_status()
            dag_data = resp.json()
            return {
                "dag_id": dag_id,
                "is_paused": dag_data.get("is_paused")
            }
    
        @mcp.tool()
        async def unpause_dag(dag_id: str) -> Dict[str, Any]:
            """[Tool Role]: Unpauses the specified Airflow DAG (allows scheduling new runs)."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            resp = await airflow_request("PATCH", f"/dags/{dag_id}", json={"is_paused": False})
            resp.raise_for_status()
            dag_data = resp.json()
            return {
                "dag_id": dag_id,
                "is_paused": dag_data.get("is_paused")
            }
    
        @mcp.tool()
        async def dag_graph(dag_id: str) -> Dict[str, Any]:
            """[Tool Role]: Retrieves task graph structure for the specified DAG."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            resp = await airflow_request("GET", f"/dags/{dag_id}/tasks")
            resp.raise_for_status()
            tasks_data = resp.json()
            
            tasks = tasks_data.get("tasks", [])
            task_graph = {}
            
            for task in tasks:
                task_id = task.get("task_id")
                task_graph[task_id] = {
                    "task_id": task_id,
                    "task_type": task.get("class_ref", {}).get("class_name"),
                    "downstream_task_ids": task.get("downstream_task_ids", []),
                    "upstream_task_ids": task.get("upstream_task_ids", [])
                }
            
            return {
                "dag_id": dag_id,
                "task_graph": task_graph,
                "total_tasks": len(tasks),
                "task_relationships": {
                    "nodes": list(task_graph.keys()),
                    "edges": [(task_id, downstream) for task_id, task_info in task_graph.items() 
                             for downstream in task_info["downstream_task_ids"]]
                }
            }
    
        @mcp.tool()
        async def list_tasks(dag_id: str) -> Dict[str, Any]:
            """[Tool Role]: Lists all tasks within the specified DAG."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            resp = await airflow_request("GET", f"/dags/{dag_id}/tasks")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def dag_code(dag_id: str) -> Dict[str, Any]:
            """[Tool Role]: Retrieves the source code for the specified DAG."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            resp = await airflow_request("GET", f"/dagSources/{dag_id}")
            resp.raise_for_status()
            return resp.json()
    
        # Continue with remaining tools...
        
        # Event/Log Management (3 tools)
        @mcp.tool()
        async def list_event_logs(dag_id: Optional[str] = None, limit: int = 20, offset: int = 0) -> Dict[str, Any]:
            """[Tool Role]: Lists event logs from Airflow."""
            params = {'limit': limit, 'offset': offset}
            if dag_id:
                params['dag_id'] = dag_id
            
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            resp = await airflow_request("GET", f"/eventLogs?{query_string}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_event_log(event_log_id: int) -> Dict[str, Any]:
            """[Tool Role]: Retrieves a specific event log entry."""
            resp = await airflow_request("GET", f"/eventLogs/{event_log_id}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def all_dag_event_summary() -> Dict[str, Any]:
            """[Tool Role]: Provides summary of event logs across all DAGs."""
            resp = await airflow_request("GET", "/eventLogs?limit=1000")
            resp.raise_for_status()
            data = resp.json()
            
            event_summary = {}
            for event in data.get("event_logs", []):
                dag_id = event.get("dag_id", "unknown")
                event_type = event.get("event", "unknown")
                
                if dag_id not in event_summary:
                    event_summary[dag_id] = {}
                if event_type not in event_summary[dag_id]:
                    event_summary[dag_id][event_type] = 0
                event_summary[dag_id][event_type] += 1
            
            return {
                "event_summary": event_summary,
                "total_events": len(data.get("event_logs", [])),
                "unique_dags": len(event_summary)
            }
    
        # Import Error Management (3 tools)
        @mcp.tool()
        async def list_import_errors(limit: int = 20, offset: int = 0) -> Dict[str, Any]:
            """[Tool Role]: Lists import errors in Airflow."""
            params = {'limit': limit, 'offset': offset}
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            resp = await airflow_request("GET", f"/importErrors?{query_string}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_import_error(import_error_id: int) -> Dict[str, Any]:
            """[Tool Role]: Retrieves a specific import error."""
            resp = await airflow_request("GET", f"/importErrors/{import_error_id}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def all_dag_import_summary() -> Dict[str, Any]:
            """[Tool Role]: Provides summary of import errors across all DAGs."""
            resp = await airflow_request("GET", "/importErrors?limit=1000")
            resp.raise_for_status()
            data = resp.json()
            
            import_summary = {}
            for error in data.get("import_errors", []):
                filename = error.get("filename", "unknown")
                if filename not in import_summary:
                    import_summary[filename] = []
                import_summary[filename].append({
                    "id": error.get("id"),
                    "timestamp": error.get("timestamp"),
                    "stacktrace": error.get("stacktrace", "")[:200] + "..."
                })
            
            return {
                "import_errors_summary": import_summary,
                "total_errors": len(data.get("import_errors", [])),
                "files_with_errors": len(import_summary)
            }
    
        # Analysis/Statistics (3 tools)
        @mcp.tool()
        async def dag_run_duration(dag_id: str, limit: int = 10) -> Dict[str, Any]:
            """[Tool Role]: Analyzes DAG run durations and performance metrics."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            
            resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit={limit}&order_by=-execution_date")
            resp.raise_for_status()
            data = resp.json()
            
            runs = data.get("dag_runs", [])
            durations = []
            
            for run in runs:
                start_date = run.get("start_date")
                end_date = run.get("end_date")
                if start_date and end_date:
                    from datetime import datetime
                    start = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
                    end = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
                    duration_seconds = (end - start).total_seconds()
                    durations.append({
                        "run_id": run.get("run_id"),
                        "duration_seconds": duration_seconds,
                        "state": run.get("state"),
                        "execution_date": run.get("execution_date")
                    })
            
            avg_duration = sum(d["duration_seconds"] for d in durations) / len(durations) if durations else 0
            
            return {
                "dag_id": dag_id,
                "run_durations": durations,
                "statistics": {
                    "average_duration_seconds": avg_duration,
                    "total_analyzed_runs": len(durations),
                    "fastest_run": min(durations, key=lambda x: x["duration_seconds"]) if durations else None,
                    "slowest_run": max(durations, key=lambda x: x["duration_seconds"]) if durations else None
                }
            }
    
        @mcp.tool()
        async def dag_task_duration(dag_id: str, dag_run_id: Optional[str] = None) -> Dict[str, Any]:
            """[Tool Role]: Analyzes task durations within a DAG run."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            
            if not dag_run_id:
                # Get the latest run
                resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit=1&order_by=-execution_date")
                resp.raise_for_status()
                runs = resp.json().get("dag_runs", [])
                if not runs:
                    return {"error": f"No DAG runs found for DAG {dag_id}"}
                dag_run_id = runs[0]["run_id"]
            
            resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances")
            resp.raise_for_status()
            data = resp.json()
            
            task_durations = []
            for task in data.get("task_instances", []):
                start_date = task.get("start_date")
                end_date = task.get("end_date")
                if start_date and end_date:
                    from datetime import datetime
                    start = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
                    end = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
                    duration_seconds = (end - start).total_seconds()
                    task_durations.append({
                        "task_id": task.get("task_id"),
                        "duration_seconds": duration_seconds,
                        "state": task.get("state"),
                        "start_date": start_date,
                        "end_date": end_date
                    })
            
            return {
                "dag_id": dag_id,
                "dag_run_id": dag_run_id,
                "task_durations": task_durations,
                "total_tasks": len(task_durations)
            }
    
        @mcp.tool()
        async def dag_calendar(dag_id: str, start_date: str, end_date: str) -> Dict[str, Any]:
            """[Tool Role]: Shows DAG schedule and execution calendar for a date range."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            
            params = {
                'start_date_gte': start_date,
                'start_date_lte': end_date,
                'limit': 1000
            }
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            
            resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?{query_string}")
            resp.raise_for_status()
            data = resp.json()
            
            calendar_data = []
            for run in data.get("dag_runs", []):
                calendar_data.append({
                    "execution_date": run.get("execution_date"),
                    "start_date": run.get("start_date"),
                    "end_date": run.get("end_date"),
                    "state": run.get("state"),
                    "run_type": run.get("run_type")
                })
            
            return {
                "dag_id": dag_id,
                "date_range": {"start": start_date, "end": end_date},
                "calendar_entries": calendar_data,
                "total_runs": len(calendar_data)
            }
    
        # System Information (6 tools)
        @mcp.tool()
        async def get_health() -> Dict[str, Any]:
            """[Tool Role]: Checks Airflow cluster health status."""
            # Import here to avoid circular imports
            from ..functions import get_api_version
            
            api_version = get_api_version()
            
            if api_version == "v2":
                # v2 API: Use /monitor/health endpoint (Airflow 3.x)
                resp = await airflow_request("GET", "/monitor/health")
            else:
                # v1 API: Use /health endpoint (Airflow 2.x)
                resp = await airflow_request("GET", "/health")
            
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_version() -> Dict[str, Any]:
            """[Tool Role]: Gets Airflow version information."""
            resp = await airflow_request("GET", "/version")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_config() -> Dict[str, Any]:
            """[Tool Role]: Retrieves Airflow configuration."""
            resp = await airflow_request("GET", "/config")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def list_config_sections() -> Dict[str, Any]:
            """[Tool Role]: Lists all configuration sections with summary."""
            try:
                resp = await airflow_request("GET", "/config")
                resp.raise_for_status()
                config_data = resp.json()
                
                sections_summary = {}
                for section_name, section_data in config_data.get("sections", {}).items():
                    options_count = len(section_data.get("options", {}))
                    sections_summary[section_name] = {
                        "options_count": options_count,
                        "sample_options": list(section_data.get("options", {}).keys())[:5]
                    }
                
                return {
                    "sections_summary": sections_summary,
                    "total_sections": len(sections_summary)
                }
            except Exception as e:
                return {
                    "error": f"Configuration access denied: {str(e)}",
                    "note": "This requires 'expose_config = True' in airflow.cfg [webserver] section"
                }
    
        @mcp.tool()
        async def get_config_section(section_name: str) -> Dict[str, Any]:
            """[Tool Role]: Gets all options within a specific configuration section."""
            try:
                resp = await airflow_request("GET", "/config")
                resp.raise_for_status()
                config_data = resp.json()
                
                section_data = config_data.get("sections", {}).get(section_name)
                if not section_data:
                    return {"error": f"Section '{section_name}' not found"}
                
                return {
                    "section_name": section_name,
                    "options": section_data.get("options", {}),
                    "options_count": len(section_data.get("options", {}))
                }
            except Exception as e:
                return {
                    "error": f"Configuration access denied: {str(e)}",
                    "note": "This requires 'expose_config = True' in airflow.cfg [webserver] section"
                }
    
        @mcp.tool()
        async def search_config_options(search_term: str) -> Dict[str, Any]:
            """[Tool Role]: Searches for configuration options matching a term."""
            try:
                resp = await airflow_request("GET", "/config")
                resp.raise_for_status()
                config_data = resp.json()
                
                matching_options = {}
                for section_name, section_data in config_data.get("sections", {}).items():
                    section_matches = {}
                    for option_name, option_data in section_data.get("options", {}).items():
                        if search_term.lower() in option_name.lower() or search_term.lower() in str(option_data.get("value", "")).lower():
                            section_matches[option_name] = option_data
                    
                    if section_matches:
                        matching_options[section_name] = section_matches
                
                return {
                    "search_term": search_term,
                    "matching_options": matching_options,
                    "total_matches": sum(len(section) for section in matching_options.values())
                }
            except Exception as e:
                return {
                    "error": f"Configuration access denied: {str(e)}",
                    "note": "This requires 'expose_config = True' in airflow.cfg [webserver] section"
                }
    
        # Pool Management (2 tools)
        @mcp.tool()
        async def list_pools(limit: int = 20, offset: int = 0) -> Dict[str, Any]:
            """[Tool Role]: Lists all pools in Airflow."""
            params = {'limit': limit, 'offset': offset}
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            resp = await airflow_request("GET", f"/pools?{query_string}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_pool(pool_name: str) -> Dict[str, Any]:
            """[Tool Role]: Gets details for a specific pool."""
            resp = await airflow_request("GET", f"/pools/{pool_name}")
            resp.raise_for_status()
            return resp.json()
    
        # Task Instance Management (5 tools)
        @mcp.tool()
        async def list_task_instances_all(
            dag_id: Optional[str] = None,
            dag_run_id: Optional[str] = None,
            task_id: Optional[str] = None,
            execution_date_gte: Optional[str] = None,
            execution_date_lte: Optional[str] = None,
            start_date_gte: Optional[str] = None,
            start_date_lte: Optional[str] = None,
            end_date_gte: Optional[str] = None,
            end_date_lte: Optional[str] = None,
            duration_gte: Optional[float] = None,
            duration_lte: Optional[float] = None,
            state: Optional[List[str]] = None,
            pool: Optional[List[str]] = None,
            limit: int = 100,
            offset: int = 0
        ) -> Dict[str, Any]:
            """[Tool Role]: Lists task instances with comprehensive filtering options."""
            params = {'limit': limit, 'offset': offset}
            
            # Add all filter parameters
            if dag_id:
                params['dag_id'] = dag_id
            if dag_run_id:
                params['dag_run_id'] = dag_run_id
            if task_id:
                params['task_id'] = task_id
            if execution_date_gte:
                params['execution_date_gte'] = execution_date_gte
            if execution_date_lte:
                params['execution_date_lte'] = execution_date_lte
            if start_date_gte:
                params['start_date_gte'] = start_date_gte
            if start_date_lte:
                params['start_date_lte'] = start_date_lte
            if end_date_gte:
                params['end_date_gte'] = end_date_gte
            if end_date_lte:
                params['end_date_lte'] = end_date_lte
            if duration_gte:
                params['duration_gte'] = duration_gte
            if duration_lte:
                params['duration_lte'] = duration_lte
            if state:
                params['state'] = state
            if pool:
                params['pool'] = pool
            
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            resp = await airflow_request("GET", f"/taskInstances?{query_string}")
            resp.raise_for_status()
            return resp.json()
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries full burden. It mentions 'comprehensive filtering options' which hints at the tool's capabilities, but doesn't disclose important behavioral traits like whether this is a read-only operation, pagination behavior (despite limit/offset parameters), performance characteristics, or authentication requirements. The description adds minimal value beyond the tool name.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is extremely concise - just one sentence. While this is efficient, it's arguably too brief for a tool with 15 parameters and no schema descriptions. The single sentence is front-loaded with the core purpose, but lacks necessary elaboration for such a complex tool.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

For a tool with 15 parameters, 0% schema description coverage, no annotations, and complex filtering capabilities, the description is severely inadequate. While an output schema exists, the description doesn't provide enough context about the tool's behavior, parameter usage, or differentiation from similar tools to enable effective use.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters2/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

With 15 parameters and 0% schema description coverage, the description fails to compensate for the schema's lack of documentation. 'Comprehensive filtering options' vaguely references the parameters but provides no specifics about what each parameter does, how they interact, or what values are acceptable. The description adds almost no semantic value beyond what's evident from parameter names.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose3/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description states the tool 'Lists task instances' which is a clear verb+resource combination, but it's vague about what 'comprehensive filtering options' means. It doesn't distinguish this tool from sibling 'list_task_instances_batch' or other listing tools, leaving ambiguity about when to use this specific tool.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

No guidance is provided about when to use this tool versus alternatives. With sibling tools like 'list_task_instances_batch' available, the description offers no comparison or context about different use cases. The mention of 'comprehensive filtering' is too vague to serve as meaningful guidance.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server