get_dags_detailed_batch
Retrieve detailed information and latest execution data for multiple DAGs in batch, combining static configuration and dynamic runtime details for streamlined Airflow management.
Instructions
[Tool Role]: Retrieves detailed information for multiple DAGs in batch with get_dag() level detail plus latest run information.
This tool combines list_dags() filtering with get_dag() detailed information retrieval, providing comprehensive DAG details AND latest execution information for multiple DAGs in a single response. Each DAG entry includes both static configuration details and dynamic runtime information.
Args: limit: Maximum number of DAGs to process (default: 100) - Use higher values (500-1000) for large environments - Ignored when fetch_all=True offset: Number of DAGs to skip for pagination (default: 0) fetch_all: If True, fetches all DAGs regardless of limit/offset (default: False) id_contains: Filter DAGs by ID containing this string (optional) name_contains: Filter DAGs by display name containing this string (optional) is_active: Filter by active status - True/False (optional) is_paused: Filter by paused status - True/False (optional)
Usage Examples: - All unpaused DAGs with full details and latest runs: get_dags_detailed_batch(fetch_all=True, is_paused=False) - Active, unpaused DAGs only: get_dags_detailed_batch(is_active=True, is_paused=False) - DAGs containing "example": get_dags_detailed_batch(id_contains="example", limit=50) - Paginated batch: get_dags_detailed_batch(limit=100, offset=200)
Returns: Dictionary containing: - dags_detailed: List of detailed DAG objects with: * All get_dag() fields (dag_id, schedule_interval, start_date, owners, tags, etc.) * latest_dag_run: Most recent execution information (run_id, state, start_date, end_date, etc.) - total_processed: Number of DAGs successfully processed - total_available: Total number of DAGs matching initial filters - processing_stats: Success/failure counts and error details - applied_filters: Summary of filters applied - pagination_info: Current page info and remaining counts
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| fetch_all | No | ||
| id_contains | No | ||
| is_active | No | ||
| is_paused | No | ||
| limit | No | ||
| name_contains | No | ||
| offset | No |
Implementation Reference
- The core handler function for the 'get_dags_detailed_batch' tool. It fetches a batch of DAGs using list_dags_internal, enriches each with detailed info from get_dag_detailed_info, adds the latest DAG run information via Airflow API, applies filters, and returns comprehensive statistics and pagination info.@mcp.tool() async def get_dags_detailed_batch( limit: int = 100, offset: int = 0, fetch_all: bool = False, id_contains: Optional[str] = None, name_contains: Optional[str] = None, is_active: Optional[bool] = None, is_paused: Optional[bool] = None ) -> Dict[str, Any]: """ [Tool Role]: Retrieves detailed information for multiple DAGs in batch with latest run information. """ dag_list_result = await list_dags_internal( limit=limit, offset=offset, fetch_all=fetch_all, id_contains=id_contains, name_contains=name_contains ) dags_basic = dag_list_result.get("dags", []) detailed_dags = [] success_count = 0 error_count = 0 errors = [] skipped_count = 0 for dag_basic in dags_basic: dag_id = dag_basic.get("dag_id") if not dag_id: skipped_count += 1 continue if is_active is not None and dag_basic.get("is_active") != is_active: skipped_count += 1 continue if is_paused is not None and dag_basic.get("is_paused") != is_paused: skipped_count += 1 continue try: detailed_dag = await get_dag_detailed_info(dag_id) try: latest_run_resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit=1&order_by=-execution_date") latest_run_resp.raise_for_status() latest_runs = latest_run_resp.json().get("dag_runs", []) if latest_runs: latest_run = latest_runs[0] detailed_dag["latest_dag_run"] = { "run_id": latest_run.get("run_id"), "run_type": latest_run.get("run_type"), "state": latest_run.get("state"), "execution_date": latest_run.get("execution_date"), "start_date": latest_run.get("start_date"), "end_date": latest_run.get("end_date"), "data_interval_start": latest_run.get("data_interval_start"), "data_interval_end": latest_run.get("data_interval_end"), "external_trigger": latest_run.get("external_trigger"), "conf": latest_run.get("conf"), "note": latest_run.get("note") } else: detailed_dag["latest_dag_run"] = None except Exception: detailed_dag["latest_dag_run"] = None detailed_dags.append(detailed_dag) success_count += 1 except Exception as e: error_count += 1 errors.append({ "dag_id": dag_id, "error": str(e) }) return { "dags_detailed": detailed_dags, "total_processed": success_count, "total_available": dag_list_result.get("total_entries", 0), "returned_count": len(detailed_dags), "processing_stats": { "success_count": success_count, "error_count": error_count, "skipped_count": skipped_count, "errors": errors }, "applied_filters": { "id_contains": id_contains, "name_contains": name_contains, "is_active": is_active, "is_paused": is_paused, "limit": limit, "offset": offset, "fetch_all": fetch_all }, "pagination_info": dag_list_result.get("pagination_info", {}), "has_more_pages": dag_list_result.get("has_more_pages", False), "next_offset": dag_list_result.get("next_offset") }
- src/mcp_airflow_api/tools/v1_tools.py:23-23 (registration)Registration of common tools (including get_dags_detailed_batch) for Airflow API v1 by calling register_common_tools on the MCP server instance.common_tools.register_common_tools(mcp)
- src/mcp_airflow_api/tools/v2_tools.py:24-24 (registration)Registration of common tools (including get_dags_detailed_batch) for Airflow API v2 by calling register_common_tools on the MCP server instance.common_tools.register_common_tools(mcp)
- Helper function that defines and registers all common tools using @mcp.tool() decorators inside it, including the get_dags_detailed_batch handler.def register_common_tools(mcp):