all_dag_import_summary
Identify and resolve DAG import errors by retrieving a summary of issues, including errors by filename, total errors, and affected files in Airflow clusters.
Instructions
[Tool Role]: Retrieves import error summary for all DAGs.
Returns: Summary of import errors by filename: import_summaries, total_errors, affected_files
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- The primary handler implementation for the 'all_dag_import_summary' MCP tool. This function queries the Airflow REST API's /importErrors endpoint, aggregates errors by filename, truncates long stacktraces for summary view, and returns a structured dictionary with the summary data, total error count, and number of affected files.@mcp.tool() async def all_dag_import_summary() -> Dict[str, Any]: """[Tool Role]: Provides summary of import errors across all DAGs.""" resp = await airflow_request("GET", "/importErrors?limit=1000") resp.raise_for_status() data = resp.json() import_summary = {} for error in data.get("import_errors", []): filename = error.get("filename", "unknown") if filename not in import_summary: import_summary[filename] = [] import_summary[filename].append({ "id": error.get("id"), "timestamp": error.get("timestamp"), "stacktrace": error.get("stacktrace", "")[:200] + "..." }) return { "import_errors_summary": import_summary, "total_errors": len(data.get("import_errors", [])), "files_with_errors": len(import_summary) }
- src/mcp_airflow_api/tools/v1_tools.py:13-28 (registration)For Airflow API v1, this registration function sets the v1-specific airflow_request function and calls common_tools.register_common_tools(mcp), which defines and registers the all_dag_import_summary tool among others.def register_tools(mcp): """Register v1 tools by importing common tools with v1 request function.""" logger.info("Initializing MCP server for Airflow API v1") logger.info("Loading Airflow API v1 tools (Airflow 2.x)") # Set the global request function to v1 common_tools.airflow_request = airflow_request_v1 # Register all 56 common tools (includes management tools) common_tools.register_common_tools(mcp) # V1 has no exclusive tools - all tools are shared with v2 logger.info("Registered all Airflow API v1 tools (56 tools: 43 core + 13 management tools)")
- src/mcp_airflow_api/tools/v2_tools.py:14-25 (registration)For Airflow API v2, this registration function sets the v2-specific airflow_request function and calls common_tools.register_common_tools(mcp), which defines and registers the all_dag_import_summary tool.def register_tools(mcp): """Register v2 tools: common tools + v2-exclusive asset tools.""" logger.info("Initializing MCP server for Airflow API v2") logger.info("Loading Airflow API v2 tools (Airflow 3.0+)") # Set the global request function to v2 common_tools.airflow_request = airflow_request_v2 # Register all 43 common tools common_tools.register_common_tools(mcp)
- src/mcp_airflow_api/tools/common_tools.py:21-441 (registration)The register_common_tools function defines all common tools including all_dag_import_summary with @mcp.tool() decorators inside it, effectively registering them when called from v1_tools or v2_tools.def register_common_tools(mcp): """Register all 43 common tools that work with both v1 and v2 APIs.""" if airflow_request is None: raise RuntimeError("airflow_request function must be set before registering common tools") logger.info("Registering common tools shared between v1 and v2") # Template & Prompt Management @mcp.tool() async def get_prompt_template(section: Optional[str] = None, mode: Optional[str] = None) -> str: """ [Tool Role]: Provides comprehensive prompt template for LLM interactions with Airflow operations. Args: section: Optional section name to get specific part of template mode: Optional mode (summary/detailed) to control response verbosity Returns: Comprehensive template or specific section for optimal LLM guidance """ template = read_prompt_template(PROMPT_TEMPLATE_PATH) if section: sections = parse_prompt_sections(template) for i, s in enumerate(sections): if section.lower() in s.lower(): return sections[i + 1] # +1 to skip the title section return f"Section '{section}' not found." return template # DAG Management (11 tools) @mcp.tool() async def list_dags(limit: int = 20, offset: int = 0, fetch_all: bool = False, id_contains: Optional[str] = None, name_contains: Optional[str] = None) -> Dict[str, Any]: """ [Tool Role]: Lists all DAGs registered in the Airflow cluster with pagination support. Args: limit: Maximum number of DAGs to return (default: 20) offset: Number of DAGs to skip for pagination (default: 0) fetch_all: If True, fetches all DAGs regardless of limit/offset id_contains: Filter DAGs by ID containing this string name_contains: Filter DAGs by display name containing this string Returns: Dict containing dags list, pagination info, and total counts """ return await list_dags_internal(limit, offset, fetch_all, id_contains, name_contains) @mcp.tool() async def get_dag(dag_id: str) -> Dict[str, Any]: """ [Tool Role]: Retrieves detailed information for a specific DAG. Args: dag_id: The DAG ID to get details for Returns: Comprehensive DAG details """ return await get_dag_detailed_info(dag_id) @mcp.tool() async def get_dags_detailed_batch( limit: int = 100, offset: int = 0, fetch_all: bool = False, id_contains: Optional[str] = None, name_contains: Optional[str] = None, is_active: Optional[bool] = None, is_paused: Optional[bool] = None ) -> Dict[str, Any]: """ [Tool Role]: Retrieves detailed information for multiple DAGs in batch with latest run information. """ dag_list_result = await list_dags_internal( limit=limit, offset=offset, fetch_all=fetch_all, id_contains=id_contains, name_contains=name_contains ) dags_basic = dag_list_result.get("dags", []) detailed_dags = [] success_count = 0 error_count = 0 errors = [] skipped_count = 0 for dag_basic in dags_basic: dag_id = dag_basic.get("dag_id") if not dag_id: skipped_count += 1 continue if is_active is not None and dag_basic.get("is_active") != is_active: skipped_count += 1 continue if is_paused is not None and dag_basic.get("is_paused") != is_paused: skipped_count += 1 continue try: detailed_dag = await get_dag_detailed_info(dag_id) try: latest_run_resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit=1&order_by=-execution_date") latest_run_resp.raise_for_status() latest_runs = latest_run_resp.json().get("dag_runs", []) if latest_runs: latest_run = latest_runs[0] detailed_dag["latest_dag_run"] = { "run_id": latest_run.get("run_id"), "run_type": latest_run.get("run_type"), "state": latest_run.get("state"), "execution_date": latest_run.get("execution_date"), "start_date": latest_run.get("start_date"), "end_date": latest_run.get("end_date"), "data_interval_start": latest_run.get("data_interval_start"), "data_interval_end": latest_run.get("data_interval_end"), "external_trigger": latest_run.get("external_trigger"), "conf": latest_run.get("conf"), "note": latest_run.get("note") } else: detailed_dag["latest_dag_run"] = None except Exception: detailed_dag["latest_dag_run"] = None detailed_dags.append(detailed_dag) success_count += 1 except Exception as e: error_count += 1 errors.append({ "dag_id": dag_id, "error": str(e) }) return { "dags_detailed": detailed_dags, "total_processed": success_count, "total_available": dag_list_result.get("total_entries", 0), "returned_count": len(detailed_dags), "processing_stats": { "success_count": success_count, "error_count": error_count, "skipped_count": skipped_count, "errors": errors }, "applied_filters": { "id_contains": id_contains, "name_contains": name_contains, "is_active": is_active, "is_paused": is_paused, "limit": limit, "offset": offset, "fetch_all": fetch_all }, "pagination_info": dag_list_result.get("pagination_info", {}), "has_more_pages": dag_list_result.get("has_more_pages", False), "next_offset": dag_list_result.get("next_offset") } @mcp.tool() async def running_dags() -> Dict[str, Any]: """[Tool Role]: Lists all currently running DAG runs in the Airflow cluster.""" resp = await airflow_request("GET", "/dags/~/dagRuns?state=running&limit=1000&order_by=-start_date") resp.raise_for_status() data = resp.json() running_runs = [] for run in data.get("dag_runs", []): run_info = { "dag_id": run.get("dag_id"), "dag_display_name": run.get("dag_display_name"), "run_id": run.get("run_id"), "run_type": run.get("run_type"), "state": run.get("state"), "execution_date": run.get("execution_date"), "start_date": run.get("start_date"), "end_date": run.get("end_date"), "data_interval_start": run.get("data_interval_start"), "data_interval_end": run.get("data_interval_end"), "external_trigger": run.get("external_trigger"), "conf": run.get("conf"), "note": run.get("note") } running_runs.append(run_info) return { "dag_runs": running_runs, "total_running": len(running_runs), "query_info": { "state_filter": "running", "limit": 1000, "order_by": "start_date (descending)" } } @mcp.tool() async def failed_dags() -> Dict[str, Any]: """[Tool Role]: Lists all recently failed DAG runs in the Airflow cluster.""" resp = await airflow_request("GET", "/dags/~/dagRuns?state=failed&limit=1000&order_by=-start_date") resp.raise_for_status() data = resp.json() failed_runs = [] for run in data.get("dag_runs", []): run_info = { "dag_id": run.get("dag_id"), "dag_display_name": run.get("dag_display_name"), "run_id": run.get("run_id"), "run_type": run.get("run_type"), "state": run.get("state"), "execution_date": run.get("execution_date"), "start_date": run.get("start_date"), "end_date": run.get("end_date"), "data_interval_start": run.get("data_interval_start"), "data_interval_end": run.get("data_interval_end"), "external_trigger": run.get("external_trigger"), "conf": run.get("conf"), "note": run.get("note") } failed_runs.append(run_info) return { "dag_runs": failed_runs, "total_failed": len(failed_runs), "query_info": { "state_filter": "failed", "limit": 1000, "order_by": "start_date (descending)" } } @mcp.tool() async def trigger_dag(dag_id: str) -> Dict[str, Any]: """[Tool Role]: Triggers a new DAG run for a specified Airflow DAG.""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("POST", f"/dags/{dag_id}/dagRuns", json={"conf": {}}) resp.raise_for_status() run = resp.json() return { "dag_id": dag_id, "run_id": run.get("run_id"), "state": run.get("state"), "execution_date": run.get("execution_date"), "start_date": run.get("start_date"), "end_date": run.get("end_date") } @mcp.tool() async def pause_dag(dag_id: str) -> Dict[str, Any]: """[Tool Role]: Pauses the specified Airflow DAG (prevents scheduling new runs).""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("PATCH", f"/dags/{dag_id}", json={"is_paused": True}) resp.raise_for_status() dag_data = resp.json() return { "dag_id": dag_id, "is_paused": dag_data.get("is_paused") } @mcp.tool() async def unpause_dag(dag_id: str) -> Dict[str, Any]: """[Tool Role]: Unpauses the specified Airflow DAG (allows scheduling new runs).""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("PATCH", f"/dags/{dag_id}", json={"is_paused": False}) resp.raise_for_status() dag_data = resp.json() return { "dag_id": dag_id, "is_paused": dag_data.get("is_paused") } @mcp.tool() async def dag_graph(dag_id: str) -> Dict[str, Any]: """[Tool Role]: Retrieves task graph structure for the specified DAG.""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("GET", f"/dags/{dag_id}/tasks") resp.raise_for_status() tasks_data = resp.json() tasks = tasks_data.get("tasks", []) task_graph = {} for task in tasks: task_id = task.get("task_id") task_graph[task_id] = { "task_id": task_id, "task_type": task.get("class_ref", {}).get("class_name"), "downstream_task_ids": task.get("downstream_task_ids", []), "upstream_task_ids": task.get("upstream_task_ids", []) } return { "dag_id": dag_id, "task_graph": task_graph, "total_tasks": len(tasks), "task_relationships": { "nodes": list(task_graph.keys()), "edges": [(task_id, downstream) for task_id, task_info in task_graph.items() for downstream in task_info["downstream_task_ids"]] } } @mcp.tool() async def list_tasks(dag_id: str) -> Dict[str, Any]: """[Tool Role]: Lists all tasks within the specified DAG.""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("GET", f"/dags/{dag_id}/tasks") resp.raise_for_status() return resp.json() @mcp.tool() async def dag_code(dag_id: str) -> Dict[str, Any]: """[Tool Role]: Retrieves the source code for the specified DAG.""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("GET", f"/dagSources/{dag_id}") resp.raise_for_status() return resp.json() # Continue with remaining tools... # Event/Log Management (3 tools) @mcp.tool() async def list_event_logs(dag_id: Optional[str] = None, limit: int = 20, offset: int = 0) -> Dict[str, Any]: """[Tool Role]: Lists event logs from Airflow.""" params = {'limit': limit, 'offset': offset} if dag_id: params['dag_id'] = dag_id query_string = "&".join([f"{k}={v}" for k, v in params.items()]) resp = await airflow_request("GET", f"/eventLogs?{query_string}") resp.raise_for_status() return resp.json() @mcp.tool() async def get_event_log(event_log_id: int) -> Dict[str, Any]: """[Tool Role]: Retrieves a specific event log entry.""" resp = await airflow_request("GET", f"/eventLogs/{event_log_id}") resp.raise_for_status() return resp.json() @mcp.tool() async def all_dag_event_summary() -> Dict[str, Any]: """[Tool Role]: Provides summary of event logs across all DAGs.""" resp = await airflow_request("GET", "/eventLogs?limit=1000") resp.raise_for_status() data = resp.json() event_summary = {} for event in data.get("event_logs", []): dag_id = event.get("dag_id", "unknown") event_type = event.get("event", "unknown") if dag_id not in event_summary: event_summary[dag_id] = {} if event_type not in event_summary[dag_id]: event_summary[dag_id][event_type] = 0 event_summary[dag_id][event_type] += 1 return { "event_summary": event_summary, "total_events": len(data.get("event_logs", [])), "unique_dags": len(event_summary) } # Import Error Management (3 tools) @mcp.tool() async def list_import_errors(limit: int = 20, offset: int = 0) -> Dict[str, Any]: """[Tool Role]: Lists import errors in Airflow.""" params = {'limit': limit, 'offset': offset} query_string = "&".join([f"{k}={v}" for k, v in params.items()]) resp = await airflow_request("GET", f"/importErrors?{query_string}") resp.raise_for_status() return resp.json() @mcp.tool() async def get_import_error(import_error_id: int) -> Dict[str, Any]: """[Tool Role]: Retrieves a specific import error.""" resp = await airflow_request("GET", f"/importErrors/{import_error_id}") resp.raise_for_status() return resp.json() @mcp.tool() async def all_dag_import_summary() -> Dict[str, Any]: """[Tool Role]: Provides summary of import errors across all DAGs.""" resp = await airflow_request("GET", "/importErrors?limit=1000") resp.raise_for_status() data = resp.json() import_summary = {} for error in data.get("import_errors", []): filename = error.get("filename", "unknown") if filename not in import_summary: import_summary[filename] = [] import_summary[filename].append({ "id": error.get("id"), "timestamp": error.get("timestamp"), "stacktrace": error.get("stacktrace", "")[:200] + "..." }) return { "import_errors_summary": import_summary, "total_errors": len(data.get("import_errors", [])), "files_with_errors": len(import_summary) }