Skip to main content
Glama
knishioka

Treasure Data MCP Server

by knishioka

td_diagnose_workflow

Diagnose workflow failures and performance issues by analyzing execution history to identify root causes, detect patterns, and provide fix recommendations.

Instructions

Health check for workflows - find why they're failing or slow.

Automated troubleshooting that analyzes execution history to identify patterns, calculate health scores, and provide fix recommendations. Common scenarios: - Workflow suddenly failing - Find root cause - Performance degradation - Identify slow tasks - Reliability issues - Pattern analysis - Pre-deployment check - Ensure workflow health - Incident response - Quick failure diagnosis Time windows: "30d", "7d", "24h" for trend analysis Levels: "basic" (quick stats), "comprehensive" (full analysis) Returns health score (0-10), failure patterns, and prioritized fixes. Args: workflow_identifier: Workflow name, ID, or partial match time_window: Time period to analyze (e.g., "30d", "7d", "24h") diagnostic_level: "basic" for quick check, "comprehensive" for deep analysis Returns: Health report with score, issues, trends, and optimization recommendations

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
diagnostic_levelNobasic
time_windowNo30d
workflow_identifierYes

Implementation Reference

  • The primary handler function for the 'td_diagnose_workflow' tool. It performs comprehensive workflow diagnosis including metrics calculation, health scoring, failure analysis, issue identification, and actionable recommendations.
    async def td_diagnose_workflow( workflow_identifier: str, time_window: str = "30d", diagnostic_level: str = "basic", ) -> dict[str, Any]: """Health check for workflows - find why they're failing or slow. Automated troubleshooting that analyzes execution history to identify patterns, calculate health scores, and provide fix recommendations. Common scenarios: - Workflow suddenly failing - Find root cause - Performance degradation - Identify slow tasks - Reliability issues - Pattern analysis - Pre-deployment check - Ensure workflow health - Incident response - Quick failure diagnosis Time windows: "30d", "7d", "24h" for trend analysis Levels: "basic" (quick stats), "comprehensive" (full analysis) Returns health score (0-10), failure patterns, and prioritized fixes. Args: workflow_identifier: Workflow name, ID, or partial match time_window: Time period to analyze (e.g., "30d", "7d", "24h") diagnostic_level: "basic" for quick check, "comprehensive" for deep analysis Returns: Health report with score, issues, trends, and optimization recommendations """ if not workflow_identifier or not workflow_identifier.strip(): return _format_error_response("Workflow identifier cannot be empty") client = _create_client(include_workflow=True) if isinstance(client, dict): return client try: # Find the workflow workflows = client.get_workflows(count=1000, all_results=True) target_workflow = None # Try exact ID match first if re.match(r"^\d+$", workflow_identifier): for w in workflows: if w.id == workflow_identifier: target_workflow = w break # Try name match if not target_workflow: workflow_lower = workflow_identifier.lower() for w in workflows: if workflow_lower in w.name.lower(): target_workflow = w break if not target_workflow: return _format_error_response(f"Workflow '{workflow_identifier}' not found") # Calculate time range start_time = _calculate_time_window(time_window) # Initialize diagnosis result result: dict[str, Any] = { "workflow": { "id": target_workflow.id, "name": target_workflow.name, "project": target_workflow.project.name, "timezone": target_workflow.timezone, "scheduled": target_workflow.schedule is not None, }, "time_window": time_window, "diagnostic_level": diagnostic_level, } # Add schedule info if available if target_workflow.schedule: result["workflow"]["schedule"] = target_workflow.schedule # Analyze session history sessions = [] total_duration = 0.0 successful_runs = 0 for session in target_workflow.latest_sessions: session_time = _parse_datetime(session.session_time) if start_time and session_time and session_time < start_time: continue # Skip sessions outside time window session_info = { "session_time": session.session_time, "status": session.last_attempt.status, "success": session.last_attempt.success, } # Calculate duration if finished if session.last_attempt.created_at and session.last_attempt.finished_at: created = _parse_datetime(session.last_attempt.created_at) finished = _parse_datetime(session.last_attempt.finished_at) if created and finished: duration = (finished - created).total_seconds() session_info["duration_seconds"] = duration total_duration += duration sessions.append(session_info) if session.last_attempt.success: successful_runs += 1 # Calculate metrics total_runs = len(sessions) success_rate = successful_runs / total_runs if total_runs > 0 else 0 avg_duration = total_duration / successful_runs if successful_runs > 0 else 0 # Analyze failure patterns failure_patterns = _analyze_failure_patterns(sessions) # Identify issues issues = [] # High failure rate if success_rate < 0.8: issues.append( { "severity": "high", "category": "error_rate", "description": ( f"High failure rate: " f"{(1 - success_rate) * 100:.1f}% of runs failing" ), "affected_components": ["workflow_execution"], } ) # Resource timeout patterns timeout_issues = [ p for p in failure_patterns if p["type"] == "resource_timeout" ] if timeout_issues and timeout_issues[0]["count"] > 3: issues.append( { "severity": "high", "category": "resource_management", "description": "Frequent resource timeouts detected", "recommendation": ( "Consider increasing memory allocation or optimizing queries" ), "affected_components": ["resource_allocation"], } ) # Performance degradation (would need historical data for proper trend) if avg_duration > 3600: # > 1 hour average issues.append( { "severity": "medium", "category": "performance", "description": ( f"Long average execution time: {avg_duration / 60:.1f} minutes" ), "recommendation": "Review query optimization opportunities", "affected_components": ["query_performance"], } ) # Calculate health score health_score = _calculate_health_score( success_rate, avg_duration, failure_patterns, result["workflow"] ) # Build result result["health_score"] = round(health_score, 1) result["issues"] = issues result["metrics"] = { "total_runs": total_runs, "successful_runs": successful_runs, "success_rate": round(success_rate, 3), "avg_duration_minutes": round(avg_duration / 60, 1) if avg_duration > 0 else 0, } # Add trends (simplified without historical data) result["trends"] = { "success_rate_trend": "stable", # Would need historical comparison "execution_time_trend": "stable", "resource_usage_trend": "unknown", } # Add failure analysis for comprehensive level if diagnostic_level == "comprehensive": result["failure_analysis"] = { "patterns": failure_patterns, "recent_failures": [s for s in sessions if not s["success"]][:10], } # Add optimization opportunities optimization_opportunities = [] if target_workflow.schedule and avg_duration > 1800: # > 30 min optimization_opportunities.append( { "type": "scheduling", "description": ( "Consider adjusting schedule to avoid peak hours" ), "potential_impact": "Reduce resource contention", } ) if len(failure_patterns) > 0: optimization_opportunities.append( { "type": "error_handling", "description": "Implement retry logic for transient failures", "potential_impact": ( f"Could recover {failure_patterns[0]['count']} failed runs" ), } ) result["optimization_opportunities"] = optimization_opportunities # Generate recommendations recommendations = _generate_recommendations( health_score, issues, result["workflow"] ) result["recommendations"] = recommendations return result except Exception as e: return _format_error_response(f"Failed to diagnose workflow: {str(e)}")
  • Direct registration of the td_diagnose_workflow tool using the MCP decorator within the register_diagnostic_tools function.
    mcp.tool()(td_diagnose_workflow)
  • Top-level registration call that invokes the diagnostic tools registration, including td_diagnose_workflow.
    diagnostic_tools.register_diagnostic_tools(mcp, _create_client, _format_error_response)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/knishioka/td-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server