get_dags_detailed_batch
Retrieve detailed information for multiple Airflow DAGs in batch, including latest run data, to monitor and analyze workflow performance.
Instructions
[Tool Role]: Retrieves detailed information for multiple DAGs in batch with latest run information.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| limit | No | ||
| offset | No | ||
| fetch_all | No | ||
| id_contains | No | ||
| name_contains | No | ||
| is_active | No | ||
| is_paused | No |
Implementation Reference
- The core handler function for the 'get_dags_detailed_batch' MCP tool. This async function retrieves detailed information for multiple DAGs in batch, including their latest DAG run details. It uses helper functions like list_dags_internal and get_dag_detailed_info, applies filters, handles errors, and returns comprehensive statistics and results.@mcp.tool() async def get_dags_detailed_batch( limit: int = 100, offset: int = 0, fetch_all: bool = False, id_contains: Optional[str] = None, name_contains: Optional[str] = None, is_active: Optional[bool] = None, is_paused: Optional[bool] = None ) -> Dict[str, Any]: """ [Tool Role]: Retrieves detailed information for multiple DAGs in batch with latest run information. """ dag_list_result = await list_dags_internal( limit=limit, offset=offset, fetch_all=fetch_all, id_contains=id_contains, name_contains=name_contains ) dags_basic = dag_list_result.get("dags", []) detailed_dags = [] success_count = 0 error_count = 0 errors = [] skipped_count = 0 for dag_basic in dags_basic: dag_id = dag_basic.get("dag_id") if not dag_id: skipped_count += 1 continue if is_active is not None and dag_basic.get("is_active") != is_active: skipped_count += 1 continue if is_paused is not None and dag_basic.get("is_paused") != is_paused: skipped_count += 1 continue try: detailed_dag = await get_dag_detailed_info(dag_id) try: latest_run_resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit=1&order_by=-execution_date") latest_run_resp.raise_for_status() latest_runs = latest_run_resp.json().get("dag_runs", []) if latest_runs: latest_run = latest_runs[0] detailed_dag["latest_dag_run"] = { "run_id": latest_run.get("run_id"), "run_type": latest_run.get("run_type"), "state": latest_run.get("state"), "execution_date": latest_run.get("execution_date"), "start_date": latest_run.get("start_date"), "end_date": latest_run.get("end_date"), "data_interval_start": latest_run.get("data_interval_start"), "data_interval_end": latest_run.get("data_interval_end"), "external_trigger": latest_run.get("external_trigger"), "conf": latest_run.get("conf"), "note": latest_run.get("note") } else: detailed_dag["latest_dag_run"] = None except Exception: detailed_dag["latest_dag_run"] = None detailed_dags.append(detailed_dag) success_count += 1 except Exception as e: error_count += 1 errors.append({ "dag_id": dag_id, "error": str(e) }) return { "dags_detailed": detailed_dags, "total_processed": success_count, "total_available": dag_list_result.get("total_entries", 0), "returned_count": len(detailed_dags), "processing_stats": { "success_count": success_count, "error_count": error_count, "skipped_count": skipped_count, "errors": errors }, "applied_filters": { "id_contains": id_contains, "name_contains": name_contains, "is_active": is_active, "is_paused": is_paused, "limit": limit, "offset": offset, "fetch_all": fetch_all }, "pagination_info": dag_list_result.get("pagination_info", {}), "has_more_pages": dag_list_result.get("has_more_pages", False), "next_offset": dag_list_result.get("next_offset") }