Skip to main content
Glama
debugger.py8.06 kB
""" Workflow Debugger Service - The Doctor Handles execution analysis and deep error diagnosis for auto-healing. """ import json from typing import Optional, List, Dict, Any from app.core.client import get_client, safe_tool from app.core.logging import debugger_logger as logger def _extract_error_details(execution_data: Dict[str, Any]) -> Dict[str, Any]: """ Deep dive into execution data to extract precise error information. Navigates through resultData.runData to find the failing node. """ error_info = { "failed_node": "Unknown", "node_type": "Unknown", "error_type": "Unknown", "error_message": "No error details available", "stack_trace": None, "input_data": None } if "data" not in execution_data: return error_info result_data = execution_data["data"].get("resultData", {}) # Primary error location (top-level error object) if "error" in result_data: top_error = result_data["error"] error_info["failed_node"] = top_error.get("node", {}).get("name", "Unknown Node") error_info["node_type"] = top_error.get("node", {}).get("type", "Unknown") error_info["error_message"] = top_error.get("message", "No message") error_info["stack_trace"] = top_error.get("stack") # Try to get error type from description if "description" in top_error: error_info["error_type"] = top_error["description"] # Deep dive into runData for more details run_data = result_data.get("runData", {}) for node_name, node_runs in run_data.items(): for run_index, run in enumerate(node_runs): if "error" in run: node_error = run["error"] # This is the failing node error_info["failed_node"] = node_name if isinstance(node_error, dict): error_info["error_message"] = node_error.get("message", str(node_error)) error_info["error_type"] = node_error.get("name", "Error") error_info["stack_trace"] = node_error.get("stack") else: error_info["error_message"] = str(node_error) # Extract input data that caused the failure if "inputData" in run: error_info["input_data"] = run["inputData"] elif "source" in run: error_info["input_data"] = run["source"] # Found the error, no need to continue break return error_info @safe_tool async def diagnose_execution(execution_id: str) -> str: """ Deep dive diagnosis of a specific execution. Navigates the execution JSON to find exactly what failed and why. Args: execution_id: The ID of the execution to analyze. Returns: JSON string with detailed diagnosis including: - Failed node name and type - Error type and message - Input data that caused the failure - Stack trace (if available) """ logger.info(f"Diagnosing execution: {execution_id}") client = get_client() execution = await client.get(f"/executions/{execution_id}", params={"includeData": "true"}) # Extract workflow info workflow_info = { "workflow_id": execution.get("workflowId"), "workflow_name": execution.get("workflowData", {}).get("name", "Unknown"), "started_at": execution.get("startedAt"), "finished_at": execution.get("stoppedAt"), "status": execution.get("status"), "mode": execution.get("mode") } # Deep dive for error details error_details = _extract_error_details(execution) diagnosis = { "execution_id": execution_id, "workflow": workflow_info, "diagnosis": error_details, "recommendation": _generate_recommendation(error_details) } logger.info(f"Diagnosis complete: Node '{error_details['failed_node']}' failed with: {error_details['error_message'][:100]}") return json.dumps(diagnosis, indent=2) def _generate_recommendation(error_info: Dict[str, Any]) -> str: """ Generate an AI-friendly recommendation based on the error type. """ message = error_info.get("error_message", "").lower() node = error_info.get("failed_node", "") if "404" in message or "not found" in message: return f"The endpoint or resource in node '{node}' returned 404. Check the URL path or resource ID." if "401" in message or "unauthorized" in message: return f"Authentication failed in node '{node}'. Verify API credentials or tokens." if "timeout" in message: return f"Node '{node}' timed out. Consider increasing timeout or checking endpoint availability." if "json" in message or "parse" in message: return f"Node '{node}' received invalid JSON. Check the data format from the previous node." if "undefined" in message or "property" in message: return f"Node '{node}' tried to access a missing property. Check input data structure." return f"Review the error in node '{node}' and verify its configuration and input data." @safe_tool async def analyze_execution_failures( workflow_id: Optional[str] = None, limit: int = 5 ) -> str: """ Analyze multiple failed executions to find patterns. Args: workflow_id: Optional workflow ID to filter by. limit: Maximum number of failed executions to analyze. Returns: JSON string with analysis of all failed executions. """ logger.info(f"Analyzing up to {limit} failed executions" + (f" for workflow {workflow_id}" if workflow_id else "")) params = { "includeData": "true", "status": "error", "limit": limit } if workflow_id: params["workflowId"] = workflow_id client = get_client() data = await client.get("/executions", params=params) executions = data.get("data", []) analyses = [] for exc in executions: error_details = _extract_error_details(exc) analyses.append({ "execution_id": exc["id"], "workflow_id": exc.get("workflowId"), "workflow_name": exc.get("workflowData", {}).get("name"), "started_at": exc.get("startedAt"), "failed_node": error_details["failed_node"], "error_type": error_details["error_type"], "error_message": error_details["error_message"], "recommendation": _generate_recommendation(error_details) }) logger.info(f"Analyzed {len(analyses)} failed executions") return json.dumps(analyses, indent=2) @safe_tool async def get_execution_history( workflow_id: Optional[str] = None, limit: int = 10, status: Optional[str] = None ) -> str: """ Get execution history for workflows. Args: workflow_id: Optional workflow ID to filter by. limit: Maximum number of executions to return. status: Optional status filter ('success', 'error', 'waiting'). Returns: JSON string with execution summaries. """ params = {"limit": limit} if workflow_id: params["workflowId"] = workflow_id if status: params["status"] = status client = get_client() data = await client.get("/executions", params=params) executions = data.get("data", []) summaries = [] for exc in executions: summaries.append({ "id": exc["id"], "workflow_id": exc.get("workflowId"), "workflow_name": exc.get("workflowData", {}).get("name"), "status": exc.get("status"), "started_at": exc.get("startedAt"), "finished_at": exc.get("stoppedAt"), "mode": exc.get("mode") }) return json.dumps(summaries, indent=2)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SrAndres629/n8n_dev_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server