Skip to main content
Glama

get_xcom_entry

Retrieve a specific cross-communication (XCom) value from Apache Airflow by providing DAG, run, task, and key identifiers to access shared data between tasks.

Instructions

[Tool Role]: Gets a specific XCom entry.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_idYes
dag_run_idYes
task_idYes
xcom_keyYes

Implementation Reference

  • The core handler function for the 'get_xcom_entry' tool. It is decorated with @mcp.tool() which handles registration in MCP. Retrieves a specific XCom entry via Airflow REST API.
    @mcp.tool()
    async def get_xcom_entry(dag_id: str, dag_run_id: str, task_id: str, xcom_key: str) -> Dict[str, Any]:
        """[Tool Role]: Gets a specific XCom entry."""
        resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}")
        resp.raise_for_status()
        return resp.json()
  • The register_common_tools function defines and registers all common tools, including get_xcom_entry, using @mcp.tool() decorators.
    def register_common_tools(mcp):
        """Register all 43 common tools that work with both v1 and v2 APIs."""
        
        if airflow_request is None:
            raise RuntimeError("airflow_request function must be set before registering common tools")
        
        logger.info("Registering common tools shared between v1 and v2")
    
        # Template & Prompt Management
        @mcp.tool()
        async def get_prompt_template(section: Optional[str] = None, mode: Optional[str] = None) -> str:
            """
            [Tool Role]: Provides comprehensive prompt template for LLM interactions with Airflow operations.
    
            Args:
                section: Optional section name to get specific part of template
                mode: Optional mode (summary/detailed) to control response verbosity
    
            Returns:
                Comprehensive template or specific section for optimal LLM guidance
            """
            template = read_prompt_template(PROMPT_TEMPLATE_PATH)
            
            if section:
                sections = parse_prompt_sections(template)
                for i, s in enumerate(sections):
                    if section.lower() in s.lower():
                        return sections[i + 1]  # +1 to skip the title section
                return f"Section '{section}' not found."
        
            return template
    
        # DAG Management (11 tools)
        @mcp.tool()
        async def list_dags(limit: int = 20,
                      offset: int = 0,
                      fetch_all: bool = False,
                      id_contains: Optional[str] = None,
                      name_contains: Optional[str] = None) -> Dict[str, Any]:
            """
            [Tool Role]: Lists all DAGs registered in the Airflow cluster with pagination support.
        
            Args:
                limit: Maximum number of DAGs to return (default: 20)
                offset: Number of DAGs to skip for pagination (default: 0)
                fetch_all: If True, fetches all DAGs regardless of limit/offset
                id_contains: Filter DAGs by ID containing this string
                name_contains: Filter DAGs by display name containing this string
    
            Returns:
                Dict containing dags list, pagination info, and total counts
            """
            return await list_dags_internal(limit, offset, fetch_all, id_contains, name_contains)
    
        @mcp.tool()
        async def get_dag(dag_id: str) -> Dict[str, Any]:
            """
            [Tool Role]: Retrieves detailed information for a specific DAG.
    
            Args:
                dag_id: The DAG ID to get details for
    
            Returns:
                Comprehensive DAG details
            """
            return await get_dag_detailed_info(dag_id)
    
        @mcp.tool()
        async def get_dags_detailed_batch(
            limit: int = 100,
            offset: int = 0,
            fetch_all: bool = False,
            id_contains: Optional[str] = None,
            name_contains: Optional[str] = None,
            is_active: Optional[bool] = None,
            is_paused: Optional[bool] = None
        ) -> Dict[str, Any]:
            """
            [Tool Role]: Retrieves detailed information for multiple DAGs in batch with latest run information.
            """
            dag_list_result = await list_dags_internal(
                limit=limit, 
                offset=offset, 
                fetch_all=fetch_all,
                id_contains=id_contains,
                name_contains=name_contains
            )
        
            dags_basic = dag_list_result.get("dags", [])
            detailed_dags = []
            success_count = 0
            error_count = 0
            errors = []
            skipped_count = 0
        
            for dag_basic in dags_basic:
                dag_id = dag_basic.get("dag_id")
                if not dag_id:
                    skipped_count += 1
                    continue
                
                if is_active is not None and dag_basic.get("is_active") != is_active:
                    skipped_count += 1
                    continue
                if is_paused is not None and dag_basic.get("is_paused") != is_paused:
                    skipped_count += 1
                    continue
                
                try:
                    detailed_dag = await get_dag_detailed_info(dag_id)
                    
                    try:
                        latest_run_resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit=1&order_by=-execution_date")
                        latest_run_resp.raise_for_status()
                        latest_runs = latest_run_resp.json().get("dag_runs", [])
                    
                        if latest_runs:
                            latest_run = latest_runs[0]
                            detailed_dag["latest_dag_run"] = {
                                "run_id": latest_run.get("run_id"),
                                "run_type": latest_run.get("run_type"),
                                "state": latest_run.get("state"),
                                "execution_date": latest_run.get("execution_date"),
                                "start_date": latest_run.get("start_date"),
                                "end_date": latest_run.get("end_date"),
                                "data_interval_start": latest_run.get("data_interval_start"),
                                "data_interval_end": latest_run.get("data_interval_end"),
                                "external_trigger": latest_run.get("external_trigger"),
                                "conf": latest_run.get("conf"),
                                "note": latest_run.get("note")
                            }
                        else:
                            detailed_dag["latest_dag_run"] = None
                    except Exception:
                        detailed_dag["latest_dag_run"] = None
                    
                    detailed_dags.append(detailed_dag)
                    success_count += 1
                except Exception as e:
                    error_count += 1
                    errors.append({
                        "dag_id": dag_id,
                        "error": str(e)
                    })
        
            return {
                "dags_detailed": detailed_dags,
                "total_processed": success_count,
                "total_available": dag_list_result.get("total_entries", 0),
                "returned_count": len(detailed_dags),
                "processing_stats": {
                    "success_count": success_count,
                    "error_count": error_count,
                    "skipped_count": skipped_count,
                    "errors": errors
                },
                "applied_filters": {
                    "id_contains": id_contains,
                    "name_contains": name_contains,
                    "is_active": is_active,
                    "is_paused": is_paused,
                    "limit": limit,
                    "offset": offset,
                    "fetch_all": fetch_all
                },
                "pagination_info": dag_list_result.get("pagination_info", {}),
                "has_more_pages": dag_list_result.get("has_more_pages", False),
                "next_offset": dag_list_result.get("next_offset")
            }
    
        @mcp.tool()
        async def running_dags() -> Dict[str, Any]:
            """[Tool Role]: Lists all currently running DAG runs in the Airflow cluster."""
            resp = await airflow_request("GET", "/dags/~/dagRuns?state=running&limit=1000&order_by=-start_date")
            resp.raise_for_status()
            data = resp.json()
        
            running_runs = []
            for run in data.get("dag_runs", []):
                run_info = {
                    "dag_id": run.get("dag_id"),
                    "dag_display_name": run.get("dag_display_name"),
                    "run_id": run.get("run_id"),
                    "run_type": run.get("run_type"),
                    "state": run.get("state"),
                    "execution_date": run.get("execution_date"),
                    "start_date": run.get("start_date"),
                    "end_date": run.get("end_date"),
                    "data_interval_start": run.get("data_interval_start"),
                    "data_interval_end": run.get("data_interval_end"),
                    "external_trigger": run.get("external_trigger"),
                    "conf": run.get("conf"),
                    "note": run.get("note")
                }
                running_runs.append(run_info)
        
            return {
                "dag_runs": running_runs,
                "total_running": len(running_runs),
                "query_info": {
                    "state_filter": "running",
                    "limit": 1000,
                    "order_by": "start_date (descending)"
                }
            }
    
        @mcp.tool()
        async def failed_dags() -> Dict[str, Any]:
            """[Tool Role]: Lists all recently failed DAG runs in the Airflow cluster."""
            resp = await airflow_request("GET", "/dags/~/dagRuns?state=failed&limit=1000&order_by=-start_date")
            resp.raise_for_status()
            data = resp.json()
        
            failed_runs = []
            for run in data.get("dag_runs", []):
                run_info = {
                    "dag_id": run.get("dag_id"),
                    "dag_display_name": run.get("dag_display_name"),
                    "run_id": run.get("run_id"),
                    "run_type": run.get("run_type"),
                    "state": run.get("state"),
                    "execution_date": run.get("execution_date"),
                    "start_date": run.get("start_date"),
                    "end_date": run.get("end_date"),
                    "data_interval_start": run.get("data_interval_start"),
                    "data_interval_end": run.get("data_interval_end"),
                    "external_trigger": run.get("external_trigger"),
                    "conf": run.get("conf"),
                    "note": run.get("note")
                }
                failed_runs.append(run_info)
        
            return {
                "dag_runs": failed_runs,
                "total_failed": len(failed_runs),
                "query_info": {
                    "state_filter": "failed",
                    "limit": 1000,
                    "order_by": "start_date (descending)"
                }
            }
    
        @mcp.tool()
        async def trigger_dag(dag_id: str) -> Dict[str, Any]:
            """[Tool Role]: Triggers a new DAG run for a specified Airflow DAG."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            resp = await airflow_request("POST", f"/dags/{dag_id}/dagRuns", json={"conf": {}})
            resp.raise_for_status()
            run = resp.json()
            return {
                "dag_id": dag_id,
                "run_id": run.get("run_id"),
                "state": run.get("state"),
                "execution_date": run.get("execution_date"),
                "start_date": run.get("start_date"),
                "end_date": run.get("end_date")
            }
    
        @mcp.tool()
        async def pause_dag(dag_id: str) -> Dict[str, Any]:
            """[Tool Role]: Pauses the specified Airflow DAG (prevents scheduling new runs)."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            resp = await airflow_request("PATCH", f"/dags/{dag_id}", json={"is_paused": True})
            resp.raise_for_status()
            dag_data = resp.json()
            return {
                "dag_id": dag_id,
                "is_paused": dag_data.get("is_paused")
            }
    
        @mcp.tool()
        async def unpause_dag(dag_id: str) -> Dict[str, Any]:
            """[Tool Role]: Unpauses the specified Airflow DAG (allows scheduling new runs)."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            resp = await airflow_request("PATCH", f"/dags/{dag_id}", json={"is_paused": False})
            resp.raise_for_status()
            dag_data = resp.json()
            return {
                "dag_id": dag_id,
                "is_paused": dag_data.get("is_paused")
            }
    
        @mcp.tool()
        async def dag_graph(dag_id: str) -> Dict[str, Any]:
            """[Tool Role]: Retrieves task graph structure for the specified DAG."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            resp = await airflow_request("GET", f"/dags/{dag_id}/tasks")
            resp.raise_for_status()
            tasks_data = resp.json()
            
            tasks = tasks_data.get("tasks", [])
            task_graph = {}
            
            for task in tasks:
                task_id = task.get("task_id")
                task_graph[task_id] = {
                    "task_id": task_id,
                    "task_type": task.get("class_ref", {}).get("class_name"),
                    "downstream_task_ids": task.get("downstream_task_ids", []),
                    "upstream_task_ids": task.get("upstream_task_ids", [])
                }
            
            return {
                "dag_id": dag_id,
                "task_graph": task_graph,
                "total_tasks": len(tasks),
                "task_relationships": {
                    "nodes": list(task_graph.keys()),
                    "edges": [(task_id, downstream) for task_id, task_info in task_graph.items() 
                             for downstream in task_info["downstream_task_ids"]]
                }
            }
    
        @mcp.tool()
        async def list_tasks(dag_id: str) -> Dict[str, Any]:
            """[Tool Role]: Lists all tasks within the specified DAG."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            resp = await airflow_request("GET", f"/dags/{dag_id}/tasks")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def dag_code(dag_id: str) -> Dict[str, Any]:
            """[Tool Role]: Retrieves the source code for the specified DAG."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            resp = await airflow_request("GET", f"/dagSources/{dag_id}")
            resp.raise_for_status()
            return resp.json()
    
        # Continue with remaining tools...
        
        # Event/Log Management (3 tools)
        @mcp.tool()
        async def list_event_logs(dag_id: Optional[str] = None, limit: int = 20, offset: int = 0) -> Dict[str, Any]:
            """[Tool Role]: Lists event logs from Airflow."""
            params = {'limit': limit, 'offset': offset}
            if dag_id:
                params['dag_id'] = dag_id
            
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            resp = await airflow_request("GET", f"/eventLogs?{query_string}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_event_log(event_log_id: int) -> Dict[str, Any]:
            """[Tool Role]: Retrieves a specific event log entry."""
            resp = await airflow_request("GET", f"/eventLogs/{event_log_id}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def all_dag_event_summary() -> Dict[str, Any]:
            """[Tool Role]: Provides summary of event logs across all DAGs."""
            resp = await airflow_request("GET", "/eventLogs?limit=1000")
            resp.raise_for_status()
            data = resp.json()
            
            event_summary = {}
            for event in data.get("event_logs", []):
                dag_id = event.get("dag_id", "unknown")
                event_type = event.get("event", "unknown")
                
                if dag_id not in event_summary:
                    event_summary[dag_id] = {}
                if event_type not in event_summary[dag_id]:
                    event_summary[dag_id][event_type] = 0
                event_summary[dag_id][event_type] += 1
            
            return {
                "event_summary": event_summary,
                "total_events": len(data.get("event_logs", [])),
                "unique_dags": len(event_summary)
            }
    
        # Import Error Management (3 tools)
        @mcp.tool()
        async def list_import_errors(limit: int = 20, offset: int = 0) -> Dict[str, Any]:
            """[Tool Role]: Lists import errors in Airflow."""
            params = {'limit': limit, 'offset': offset}
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            resp = await airflow_request("GET", f"/importErrors?{query_string}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_import_error(import_error_id: int) -> Dict[str, Any]:
            """[Tool Role]: Retrieves a specific import error."""
            resp = await airflow_request("GET", f"/importErrors/{import_error_id}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def all_dag_import_summary() -> Dict[str, Any]:
            """[Tool Role]: Provides summary of import errors across all DAGs."""
            resp = await airflow_request("GET", "/importErrors?limit=1000")
            resp.raise_for_status()
            data = resp.json()
            
            import_summary = {}
            for error in data.get("import_errors", []):
                filename = error.get("filename", "unknown")
                if filename not in import_summary:
                    import_summary[filename] = []
                import_summary[filename].append({
                    "id": error.get("id"),
                    "timestamp": error.get("timestamp"),
                    "stacktrace": error.get("stacktrace", "")[:200] + "..."
                })
            
            return {
                "import_errors_summary": import_summary,
                "total_errors": len(data.get("import_errors", [])),
                "files_with_errors": len(import_summary)
            }
    
        # Analysis/Statistics (3 tools)
        @mcp.tool()
        async def dag_run_duration(dag_id: str, limit: int = 10) -> Dict[str, Any]:
            """[Tool Role]: Analyzes DAG run durations and performance metrics."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            
            resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit={limit}&order_by=-execution_date")
            resp.raise_for_status()
            data = resp.json()
            
            runs = data.get("dag_runs", [])
            durations = []
            
            for run in runs:
                start_date = run.get("start_date")
                end_date = run.get("end_date")
                if start_date and end_date:
                    from datetime import datetime
                    start = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
                    end = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
                    duration_seconds = (end - start).total_seconds()
                    durations.append({
                        "run_id": run.get("run_id"),
                        "duration_seconds": duration_seconds,
                        "state": run.get("state"),
                        "execution_date": run.get("execution_date")
                    })
            
            avg_duration = sum(d["duration_seconds"] for d in durations) / len(durations) if durations else 0
            
            return {
                "dag_id": dag_id,
                "run_durations": durations,
                "statistics": {
                    "average_duration_seconds": avg_duration,
                    "total_analyzed_runs": len(durations),
                    "fastest_run": min(durations, key=lambda x: x["duration_seconds"]) if durations else None,
                    "slowest_run": max(durations, key=lambda x: x["duration_seconds"]) if durations else None
                }
            }
    
        @mcp.tool()
        async def dag_task_duration(dag_id: str, dag_run_id: Optional[str] = None) -> Dict[str, Any]:
            """[Tool Role]: Analyzes task durations within a DAG run."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            
            if not dag_run_id:
                # Get the latest run
                resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit=1&order_by=-execution_date")
                resp.raise_for_status()
                runs = resp.json().get("dag_runs", [])
                if not runs:
                    return {"error": f"No DAG runs found for DAG {dag_id}"}
                dag_run_id = runs[0]["run_id"]
            
            resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances")
            resp.raise_for_status()
            data = resp.json()
            
            task_durations = []
            for task in data.get("task_instances", []):
                start_date = task.get("start_date")
                end_date = task.get("end_date")
                if start_date and end_date:
                    from datetime import datetime
                    start = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
                    end = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
                    duration_seconds = (end - start).total_seconds()
                    task_durations.append({
                        "task_id": task.get("task_id"),
                        "duration_seconds": duration_seconds,
                        "state": task.get("state"),
                        "start_date": start_date,
                        "end_date": end_date
                    })
            
            return {
                "dag_id": dag_id,
                "dag_run_id": dag_run_id,
                "task_durations": task_durations,
                "total_tasks": len(task_durations)
            }
    
        @mcp.tool()
        async def dag_calendar(dag_id: str, start_date: str, end_date: str) -> Dict[str, Any]:
            """[Tool Role]: Shows DAG schedule and execution calendar for a date range."""
            if not dag_id:
                raise ValueError("dag_id must not be empty")
            
            params = {
                'start_date_gte': start_date,
                'start_date_lte': end_date,
                'limit': 1000
            }
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            
            resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?{query_string}")
            resp.raise_for_status()
            data = resp.json()
            
            calendar_data = []
            for run in data.get("dag_runs", []):
                calendar_data.append({
                    "execution_date": run.get("execution_date"),
                    "start_date": run.get("start_date"),
                    "end_date": run.get("end_date"),
                    "state": run.get("state"),
                    "run_type": run.get("run_type")
                })
            
            return {
                "dag_id": dag_id,
                "date_range": {"start": start_date, "end": end_date},
                "calendar_entries": calendar_data,
                "total_runs": len(calendar_data)
            }
    
        # System Information (6 tools)
        @mcp.tool()
        async def get_health() -> Dict[str, Any]:
            """[Tool Role]: Checks Airflow cluster health status."""
            # Import here to avoid circular imports
            from ..functions import get_api_version
            
            api_version = get_api_version()
            
            if api_version == "v2":
                # v2 API: Use /monitor/health endpoint (Airflow 3.x)
                resp = await airflow_request("GET", "/monitor/health")
            else:
                # v1 API: Use /health endpoint (Airflow 2.x)
                resp = await airflow_request("GET", "/health")
            
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_version() -> Dict[str, Any]:
            """[Tool Role]: Gets Airflow version information."""
            resp = await airflow_request("GET", "/version")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_config() -> Dict[str, Any]:
            """[Tool Role]: Retrieves Airflow configuration."""
            resp = await airflow_request("GET", "/config")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def list_config_sections() -> Dict[str, Any]:
            """[Tool Role]: Lists all configuration sections with summary."""
            try:
                resp = await airflow_request("GET", "/config")
                resp.raise_for_status()
                config_data = resp.json()
                
                sections_summary = {}
                for section_name, section_data in config_data.get("sections", {}).items():
                    options_count = len(section_data.get("options", {}))
                    sections_summary[section_name] = {
                        "options_count": options_count,
                        "sample_options": list(section_data.get("options", {}).keys())[:5]
                    }
                
                return {
                    "sections_summary": sections_summary,
                    "total_sections": len(sections_summary)
                }
            except Exception as e:
                return {
                    "error": f"Configuration access denied: {str(e)}",
                    "note": "This requires 'expose_config = True' in airflow.cfg [webserver] section"
                }
    
        @mcp.tool()
        async def get_config_section(section_name: str) -> Dict[str, Any]:
            """[Tool Role]: Gets all options within a specific configuration section."""
            try:
                resp = await airflow_request("GET", "/config")
                resp.raise_for_status()
                config_data = resp.json()
                
                section_data = config_data.get("sections", {}).get(section_name)
                if not section_data:
                    return {"error": f"Section '{section_name}' not found"}
                
                return {
                    "section_name": section_name,
                    "options": section_data.get("options", {}),
                    "options_count": len(section_data.get("options", {}))
                }
            except Exception as e:
                return {
                    "error": f"Configuration access denied: {str(e)}",
                    "note": "This requires 'expose_config = True' in airflow.cfg [webserver] section"
                }
    
        @mcp.tool()
        async def search_config_options(search_term: str) -> Dict[str, Any]:
            """[Tool Role]: Searches for configuration options matching a term."""
            try:
                resp = await airflow_request("GET", "/config")
                resp.raise_for_status()
                config_data = resp.json()
                
                matching_options = {}
                for section_name, section_data in config_data.get("sections", {}).items():
                    section_matches = {}
                    for option_name, option_data in section_data.get("options", {}).items():
                        if search_term.lower() in option_name.lower() or search_term.lower() in str(option_data.get("value", "")).lower():
                            section_matches[option_name] = option_data
                    
                    if section_matches:
                        matching_options[section_name] = section_matches
                
                return {
                    "search_term": search_term,
                    "matching_options": matching_options,
                    "total_matches": sum(len(section) for section in matching_options.values())
                }
            except Exception as e:
                return {
                    "error": f"Configuration access denied: {str(e)}",
                    "note": "This requires 'expose_config = True' in airflow.cfg [webserver] section"
                }
    
        # Pool Management (2 tools)
        @mcp.tool()
        async def list_pools(limit: int = 20, offset: int = 0) -> Dict[str, Any]:
            """[Tool Role]: Lists all pools in Airflow."""
            params = {'limit': limit, 'offset': offset}
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            resp = await airflow_request("GET", f"/pools?{query_string}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_pool(pool_name: str) -> Dict[str, Any]:
            """[Tool Role]: Gets details for a specific pool."""
            resp = await airflow_request("GET", f"/pools/{pool_name}")
            resp.raise_for_status()
            return resp.json()
    
        # Task Instance Management (5 tools)
        @mcp.tool()
        async def list_task_instances_all(
            dag_id: Optional[str] = None,
            dag_run_id: Optional[str] = None,
            task_id: Optional[str] = None,
            execution_date_gte: Optional[str] = None,
            execution_date_lte: Optional[str] = None,
            start_date_gte: Optional[str] = None,
            start_date_lte: Optional[str] = None,
            end_date_gte: Optional[str] = None,
            end_date_lte: Optional[str] = None,
            duration_gte: Optional[float] = None,
            duration_lte: Optional[float] = None,
            state: Optional[List[str]] = None,
            pool: Optional[List[str]] = None,
            limit: int = 100,
            offset: int = 0
        ) -> Dict[str, Any]:
            """[Tool Role]: Lists task instances with comprehensive filtering options."""
            params = {'limit': limit, 'offset': offset}
            
            # Add all filter parameters
            if dag_id:
                params['dag_id'] = dag_id
            if dag_run_id:
                params['dag_run_id'] = dag_run_id
            if task_id:
                params['task_id'] = task_id
            if execution_date_gte:
                params['execution_date_gte'] = execution_date_gte
            if execution_date_lte:
                params['execution_date_lte'] = execution_date_lte
            if start_date_gte:
                params['start_date_gte'] = start_date_gte
            if start_date_lte:
                params['start_date_lte'] = start_date_lte
            if end_date_gte:
                params['end_date_gte'] = end_date_gte
            if end_date_lte:
                params['end_date_lte'] = end_date_lte
            if duration_gte:
                params['duration_gte'] = duration_gte
            if duration_lte:
                params['duration_lte'] = duration_lte
            if state:
                params['state'] = state
            if pool:
                params['pool'] = pool
            
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            resp = await airflow_request("GET", f"/taskInstances?{query_string}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_task_instance_details(dag_id: str, dag_run_id: str, task_id: str) -> Dict[str, Any]:
            """[Tool Role]: Gets detailed information for a specific task instance."""
            resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def list_task_instances_batch(
            limit: int = 100,
            offset: int = 0,
            start_date_gte: Optional[str] = None,
            start_date_lte: Optional[str] = None,
            state: Optional[List[str]] = None
        ) -> Dict[str, Any]:
            """[Tool Role]: Lists task instances in batch with date and state filtering."""
            params = {'limit': limit, 'offset': offset}
            
            if start_date_gte:
                params['start_date_gte'] = start_date_gte
            if start_date_lte:
                params['start_date_lte'] = start_date_lte
            if state:
                params['state'] = state
            
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            resp = await airflow_request("GET", f"/taskInstances?{query_string}")
            resp.raise_for_status()
            data = resp.json()
            
            # Add summary statistics
            task_instances = data.get("task_instances", [])
            state_summary = {}
            for task in task_instances:
                task_state = task.get("state", "unknown")
                state_summary[task_state] = state_summary.get(task_state, 0) + 1
            
            data["state_summary"] = state_summary
            return data
    
        @mcp.tool()
        async def get_task_instance_extra_links(dag_id: str, dag_run_id: str, task_id: str) -> Dict[str, Any]:
            """[Tool Role]: Gets extra links for a task instance."""
            resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/links")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_task_instance_logs(dag_id: str, dag_run_id: str, task_id: str, try_number: int = 1) -> Dict[str, Any]:
            """[Tool Role]: Retrieves logs for a specific task instance."""
            resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}")
            resp.raise_for_status()
            return resp.json()
    
        # Variable Management (2 tools)
        @mcp.tool()
        async def list_variables(limit: int = 20, offset: int = 0) -> Dict[str, Any]:
            """[Tool Role]: Lists all variables in Airflow."""
            params = {'limit': limit, 'offset': offset}
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            resp = await airflow_request("GET", f"/variables?{query_string}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_variable(variable_key: str) -> Dict[str, Any]:
            """[Tool Role]: Gets the value of a specific variable."""
            resp = await airflow_request("GET", f"/variables/{variable_key}")
            resp.raise_for_status()
            return resp.json()
    
        # XCom Management (2 tools)
        @mcp.tool()
        async def list_xcom_entries(dag_id: str, dag_run_id: str, task_id: str, limit: int = 20, offset: int = 0) -> Dict[str, Any]:
            """[Tool Role]: Lists XCom entries for a specific task instance."""
            params = {'limit': limit, 'offset': offset}
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries?{query_string}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_xcom_entry(dag_id: str, dag_run_id: str, task_id: str, xcom_key: str) -> Dict[str, Any]:
            """[Tool Role]: Gets a specific XCom entry."""
            resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}")
            resp.raise_for_status()
            return resp.json()
    
        # Connection Management (5 tools)
        @mcp.tool()
        async def list_connections(limit: int = 20, offset: int = 0) -> Dict[str, Any]:
            """[Tool Role]: Lists all connections in Airflow."""
            params = {'limit': limit, 'offset': offset}
            query_string = "&".join([f"{k}={v}" for k, v in params.items()])
            resp = await airflow_request("GET", f"/connections?{query_string}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_connection(connection_id: str) -> Dict[str, Any]:
            """[Tool Role]: Gets details for a specific connection."""
            resp = await airflow_request("GET", f"/connections/{connection_id}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def create_connection(connection_data: Dict[str, Any]) -> Dict[str, Any]:
            """[Tool Role]: Creates a new connection."""
            resp = await airflow_request("POST", "/connections", json=connection_data)
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def update_connection(connection_id: str, connection_data: Dict[str, Any]) -> Dict[str, Any]:
            """[Tool Role]: Updates an existing connection."""
            resp = await airflow_request("PATCH", f"/connections/{connection_id}", json=connection_data)
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def delete_connection(connection_id: str) -> Dict[str, Any]:
            """[Tool Role]: Deletes a connection."""
            resp = await airflow_request("DELETE", f"/connections/{connection_id}")
            resp.raise_for_status()
            return {"message": f"Connection {connection_id} deleted successfully"}
    
        # User & Permissions Management (4 tools) - v1 API only
        @mcp.tool()
        async def list_users(limit: int = 20, offset: int = 0) -> Dict[str, Any]:
            """[Tool Role]: Lists all users in the Airflow system (v1 API only)."""
            from ..functions import get_api_version
            
            api_version = get_api_version()
            if api_version == "v2":
                return {"error": "User management is not available in Airflow 3.x (API v2)", "available_in": "v1 only"}
            
            params = []
            params.append(f"limit={limit}")
            if offset > 0:
                params.append(f"offset={offset}")
            
            query_string = "&".join(params) if params else ""
            endpoint = f"/users?{query_string}" if query_string else "/users"
            
            resp = await airflow_request("GET", endpoint)
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_user(username: str) -> Dict[str, Any]:
            """[Tool Role]: Gets details of a specific user (v1 API only)."""
            from ..functions import get_api_version
            
            api_version = get_api_version()
            if api_version == "v2":
                return {"error": "User management is not available in Airflow 3.x (API v2)", "available_in": "v1 only"}
            
            resp = await airflow_request("GET", f"/users/{username}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def list_permissions() -> Dict[str, Any]:
            """[Tool Role]: Lists all permissions available in the Airflow system (v1 API only)."""
            from ..functions import get_api_version
            
            api_version = get_api_version()
            if api_version == "v2":
                return {"error": "Permission management is not available in Airflow 3.x (API v2)", "available_in": "v1 only"}
            
            resp = await airflow_request("GET", "/permissions")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def list_roles(limit: int = 20, offset: int = 0) -> Dict[str, Any]:
            """[Tool Role]: Lists all roles in the Airflow system (v1 API only)."""
            from ..functions import get_api_version
            
            api_version = get_api_version()
            if api_version == "v2":
                return {"error": "Role management is not available in Airflow 3.x (API v2)", "available_in": "v1 only"}
            
            params = []
            params.append(f"limit={limit}")
            if offset > 0:
                params.append(f"offset={offset}")
            
            query_string = "&".join(params) if params else ""
            endpoint = f"/roles?{query_string}" if query_string else "/roles"
            
            resp = await airflow_request("GET", endpoint)
            resp.raise_for_status()
            return resp.json()
    
        # Plugin Management (1 tool)
        @mcp.tool()
        async def list_plugins() -> Dict[str, Any]:
            """[Tool Role]: Lists all installed plugins in the Airflow system."""
            resp = await airflow_request("GET", "/plugins")
            resp.raise_for_status()
            return resp.json()
    
        # Provider Management (2 tools)
        @mcp.tool()
        async def list_providers() -> Dict[str, Any]:
            """[Tool Role]: Lists all provider packages installed in the Airflow system."""
            resp = await airflow_request("GET", "/providers")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_provider(provider_name: str) -> Dict[str, Any]:
            """[Tool Role]: Gets details of a specific provider package."""
            resp = await airflow_request("GET", f"/providers/{provider_name}")
            resp.raise_for_status()
            return resp.json()
    
        # Dataset Management (4 tools) - v1 API only (v2 uses Assets instead)
        @mcp.tool()
        async def list_datasets(limit: int = 20, offset: int = 0, uri_pattern: Optional[str] = None) -> Dict[str, Any]:
            """[Tool Role]: Lists all datasets in the Airflow system (v1 API only - v2 uses Assets)."""
            from ..functions import get_api_version
            
            api_version = get_api_version()
            if api_version == "v2":
                return {
                    "error": "Dataset API is not available in Airflow 3.x (API v2)", 
                    "available_in": "v1 only",
                    "v2_alternative": "Use list_assets() for Airflow 3.x data-aware scheduling"
                }
            
            params = []
            params.append(f"limit={limit}")
            if offset > 0:
                params.append(f"offset={offset}")
            if uri_pattern:
                params.append(f"uri_pattern={uri_pattern}")
            
            query_string = "&".join(params) if params else ""
            endpoint = f"/datasets?{query_string}" if query_string else "/datasets"
            
            resp = await airflow_request("GET", endpoint)
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_dataset(dataset_uri: str) -> Dict[str, Any]:
            """[Tool Role]: Gets details of a specific dataset (v1 API only - v2 uses Assets)."""
            from ..functions import get_api_version
            
            api_version = get_api_version()
            if api_version == "v2":
                return {
                    "error": "Dataset API is not available in Airflow 3.x (API v2)", 
                    "available_in": "v1 only",
                    "v2_alternative": "Use Assets API for Airflow 3.x data-aware scheduling"
                }
            
            # URL encode the URI to handle special characters
            import urllib.parse
            encoded_uri = urllib.parse.quote(dataset_uri, safe='')
            
            resp = await airflow_request("GET", f"/datasets/{encoded_uri}")
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def list_dataset_events(limit: int = 20, offset: int = 0, 
                                     dataset_uri: Optional[str] = None,
                                     source_dag_id: Optional[str] = None) -> Dict[str, Any]:
            """[Tool Role]: Lists dataset events for data lineage tracking (v1 API only - v2 uses Assets)."""
            from ..functions import get_api_version
            
            api_version = get_api_version()
            if api_version == "v2":
                return {
                    "error": "Dataset events API is not available in Airflow 3.x (API v2)", 
                    "available_in": "v1 only",
                    "v2_alternative": "Use list_asset_events() for Airflow 3.x data lineage tracking"
                }
            
            params = []
            params.append(f"limit={limit}")
            if offset > 0:
                params.append(f"offset={offset}")
            if dataset_uri:
                params.append(f"dataset_uri={dataset_uri}")
            if source_dag_id:
                params.append(f"source_dag_id={source_dag_id}")
            
            query_string = "&".join(params) if params else ""
            endpoint = f"/datasets/events?{query_string}" if query_string else "/datasets/events"
            
            resp = await airflow_request("GET", endpoint)
            resp.raise_for_status()
            return resp.json()
    
        @mcp.tool()
        async def get_dataset_events(dataset_uri: str, limit: int = 20, offset: int = 0) -> Dict[str, Any]:
            """[Tool Role]: Gets events for a specific dataset (v1 API only - v2 uses Assets)."""
            from ..functions import get_api_version
            
            api_version = get_api_version()
            if api_version == "v2":
                return {
                    "error": "Dataset events API is not available in Airflow 3.x (API v2)", 
                    "available_in": "v1 only",
                    "v2_alternative": "Use list_asset_events() for Airflow 3.x data lineage tracking"
                }
            
            import urllib.parse
            encoded_uri = urllib.parse.quote(dataset_uri, safe='')
            
            params = []
            params.append(f"limit={limit}")
            if offset > 0:
                params.append(f"offset={offset}")
            
            query_string = "&".join(params) if params else ""
            endpoint = f"/datasets/{encoded_uri}/events?{query_string}" if query_string else f"/datasets/{encoded_uri}/events"
            
            resp = await airflow_request("GET", endpoint)
            resp.raise_for_status()
            return resp.json()
    
        logger.info("Registered all common tools (56 tools total: 43 original + 13 new management tools)")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server