Skip to main content
Glama

get_dags_detailed_batch

Retrieve detailed information for multiple Airflow DAGs in batch, including latest run data, to monitor and analyze workflow performance.

Instructions

[Tool Role]: Retrieves detailed information for multiple DAGs in batch with latest run information.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
limitNo
offsetNo
fetch_allNo
id_containsNo
name_containsNo
is_activeNo
is_pausedNo

Implementation Reference

  • The core handler function for the 'get_dags_detailed_batch' MCP tool. This async function retrieves detailed information for multiple DAGs in batch, including their latest DAG run details. It uses helper functions like list_dags_internal and get_dag_detailed_info, applies filters, handles errors, and returns comprehensive statistics and results.
    @mcp.tool() async def get_dags_detailed_batch( limit: int = 100, offset: int = 0, fetch_all: bool = False, id_contains: Optional[str] = None, name_contains: Optional[str] = None, is_active: Optional[bool] = None, is_paused: Optional[bool] = None ) -> Dict[str, Any]: """ [Tool Role]: Retrieves detailed information for multiple DAGs in batch with latest run information. """ dag_list_result = await list_dags_internal( limit=limit, offset=offset, fetch_all=fetch_all, id_contains=id_contains, name_contains=name_contains ) dags_basic = dag_list_result.get("dags", []) detailed_dags = [] success_count = 0 error_count = 0 errors = [] skipped_count = 0 for dag_basic in dags_basic: dag_id = dag_basic.get("dag_id") if not dag_id: skipped_count += 1 continue if is_active is not None and dag_basic.get("is_active") != is_active: skipped_count += 1 continue if is_paused is not None and dag_basic.get("is_paused") != is_paused: skipped_count += 1 continue try: detailed_dag = await get_dag_detailed_info(dag_id) try: latest_run_resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit=1&order_by=-execution_date") latest_run_resp.raise_for_status() latest_runs = latest_run_resp.json().get("dag_runs", []) if latest_runs: latest_run = latest_runs[0] detailed_dag["latest_dag_run"] = { "run_id": latest_run.get("run_id"), "run_type": latest_run.get("run_type"), "state": latest_run.get("state"), "execution_date": latest_run.get("execution_date"), "start_date": latest_run.get("start_date"), "end_date": latest_run.get("end_date"), "data_interval_start": latest_run.get("data_interval_start"), "data_interval_end": latest_run.get("data_interval_end"), "external_trigger": latest_run.get("external_trigger"), "conf": latest_run.get("conf"), "note": latest_run.get("note") } else: detailed_dag["latest_dag_run"] = None except Exception: detailed_dag["latest_dag_run"] = None detailed_dags.append(detailed_dag) success_count += 1 except Exception as e: error_count += 1 errors.append({ "dag_id": dag_id, "error": str(e) }) return { "dags_detailed": detailed_dags, "total_processed": success_count, "total_available": dag_list_result.get("total_entries", 0), "returned_count": len(detailed_dags), "processing_stats": { "success_count": success_count, "error_count": error_count, "skipped_count": skipped_count, "errors": errors }, "applied_filters": { "id_contains": id_contains, "name_contains": name_contains, "is_active": is_active, "is_paused": is_paused, "limit": limit, "offset": offset, "fetch_all": fetch_all }, "pagination_info": dag_list_result.get("pagination_info", {}), "has_more_pages": dag_list_result.get("has_more_pages", False), "next_offset": dag_list_result.get("next_offset") }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server