Skip to main content
Glama

get_dags_detailed_batch

Retrieve detailed information for multiple Airflow DAGs in batch, including latest run data, to monitor and analyze workflow performance.

Instructions

[Tool Role]: Retrieves detailed information for multiple DAGs in batch with latest run information.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
limitNo
offsetNo
fetch_allNo
id_containsNo
name_containsNo
is_activeNo
is_pausedNo

Implementation Reference

  • The core handler function for the 'get_dags_detailed_batch' MCP tool. This async function retrieves detailed information for multiple DAGs in batch, including their latest DAG run details. It uses helper functions like list_dags_internal and get_dag_detailed_info, applies filters, handles errors, and returns comprehensive statistics and results.
    @mcp.tool()
    async def get_dags_detailed_batch(
        limit: int = 100,
        offset: int = 0,
        fetch_all: bool = False,
        id_contains: Optional[str] = None,
        name_contains: Optional[str] = None,
        is_active: Optional[bool] = None,
        is_paused: Optional[bool] = None
    ) -> Dict[str, Any]:
        """
        [Tool Role]: Retrieves detailed information for multiple DAGs in batch with latest run information.
        """
        dag_list_result = await list_dags_internal(
            limit=limit, 
            offset=offset, 
            fetch_all=fetch_all,
            id_contains=id_contains,
            name_contains=name_contains
        )
    
        dags_basic = dag_list_result.get("dags", [])
        detailed_dags = []
        success_count = 0
        error_count = 0
        errors = []
        skipped_count = 0
    
        for dag_basic in dags_basic:
            dag_id = dag_basic.get("dag_id")
            if not dag_id:
                skipped_count += 1
                continue
            
            if is_active is not None and dag_basic.get("is_active") != is_active:
                skipped_count += 1
                continue
            if is_paused is not None and dag_basic.get("is_paused") != is_paused:
                skipped_count += 1
                continue
            
            try:
                detailed_dag = await get_dag_detailed_info(dag_id)
                
                try:
                    latest_run_resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit=1&order_by=-execution_date")
                    latest_run_resp.raise_for_status()
                    latest_runs = latest_run_resp.json().get("dag_runs", [])
                
                    if latest_runs:
                        latest_run = latest_runs[0]
                        detailed_dag["latest_dag_run"] = {
                            "run_id": latest_run.get("run_id"),
                            "run_type": latest_run.get("run_type"),
                            "state": latest_run.get("state"),
                            "execution_date": latest_run.get("execution_date"),
                            "start_date": latest_run.get("start_date"),
                            "end_date": latest_run.get("end_date"),
                            "data_interval_start": latest_run.get("data_interval_start"),
                            "data_interval_end": latest_run.get("data_interval_end"),
                            "external_trigger": latest_run.get("external_trigger"),
                            "conf": latest_run.get("conf"),
                            "note": latest_run.get("note")
                        }
                    else:
                        detailed_dag["latest_dag_run"] = None
                except Exception:
                    detailed_dag["latest_dag_run"] = None
                
                detailed_dags.append(detailed_dag)
                success_count += 1
            except Exception as e:
                error_count += 1
                errors.append({
                    "dag_id": dag_id,
                    "error": str(e)
                })
    
        return {
            "dags_detailed": detailed_dags,
            "total_processed": success_count,
            "total_available": dag_list_result.get("total_entries", 0),
            "returned_count": len(detailed_dags),
            "processing_stats": {
                "success_count": success_count,
                "error_count": error_count,
                "skipped_count": skipped_count,
                "errors": errors
            },
            "applied_filters": {
                "id_contains": id_contains,
                "name_contains": name_contains,
                "is_active": is_active,
                "is_paused": is_paused,
                "limit": limit,
                "offset": offset,
                "fetch_all": fetch_all
            },
            "pagination_info": dag_list_result.get("pagination_info", {}),
            "has_more_pages": dag_list_result.get("has_more_pages", False),
            "next_offset": dag_list_result.get("next_offset")
        }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server