Skip to main content
Glama
knishioka

Treasure Data MCP Server

by knishioka

td_diagnose_workflow

Diagnose workflow issues by analyzing execution history to identify failures, performance problems, and provide fix recommendations with health scores.

Instructions

Health check for workflows - find why they're failing or slow.

Automated troubleshooting that analyzes execution history to identify
patterns, calculate health scores, and provide fix recommendations.

Common scenarios:
- Workflow suddenly failing - Find root cause
- Performance degradation - Identify slow tasks
- Reliability issues - Pattern analysis
- Pre-deployment check - Ensure workflow health
- Incident response - Quick failure diagnosis

Time windows: "30d", "7d", "24h" for trend analysis
Levels: "basic" (quick stats), "comprehensive" (full analysis)

Returns health score (0-10), failure patterns, and prioritized fixes.

Args:
    workflow_identifier: Workflow name, ID, or partial match
    time_window: Time period to analyze (e.g., "30d", "7d", "24h")
    diagnostic_level: "basic" for quick check, "comprehensive" for deep analysis

Returns:
    Health report with score, issues, trends, and optimization recommendations

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
workflow_identifierYes
time_windowNo30d
diagnostic_levelNobasic

Implementation Reference

  • The core async handler function implementing td_diagnose_workflow. Analyzes Treasure Data workflow execution history within a specified time window, calculates success rates, durations, failure patterns, health scores (0-10), identifies issues, generates prioritized recommendations, and returns a structured diagnosis report with metrics, trends, and optimization suggestions.
    async def td_diagnose_workflow(
        workflow_identifier: str,
        time_window: str = "30d",
        diagnostic_level: str = "basic",
    ) -> dict[str, Any]:
        """Health check for workflows - find why they're failing or slow.
    
        Automated troubleshooting that analyzes execution history to identify
        patterns, calculate health scores, and provide fix recommendations.
    
        Common scenarios:
        - Workflow suddenly failing - Find root cause
        - Performance degradation - Identify slow tasks
        - Reliability issues - Pattern analysis
        - Pre-deployment check - Ensure workflow health
        - Incident response - Quick failure diagnosis
    
        Time windows: "30d", "7d", "24h" for trend analysis
        Levels: "basic" (quick stats), "comprehensive" (full analysis)
    
        Returns health score (0-10), failure patterns, and prioritized fixes.
    
        Args:
            workflow_identifier: Workflow name, ID, or partial match
            time_window: Time period to analyze (e.g., "30d", "7d", "24h")
            diagnostic_level: "basic" for quick check, "comprehensive" for deep analysis
    
        Returns:
            Health report with score, issues, trends, and optimization recommendations
        """
        if not workflow_identifier or not workflow_identifier.strip():
            return _format_error_response("Workflow identifier cannot be empty")
    
        client = _create_client(include_workflow=True)
        if isinstance(client, dict):
            return client
    
        try:
            # Find the workflow
            workflows = client.get_workflows(count=1000, all_results=True)
            target_workflow = None
    
            # Try exact ID match first
            if re.match(r"^\d+$", workflow_identifier):
                for w in workflows:
                    if w.id == workflow_identifier:
                        target_workflow = w
                        break
    
            # Try name match
            if not target_workflow:
                workflow_lower = workflow_identifier.lower()
                for w in workflows:
                    if workflow_lower in w.name.lower():
                        target_workflow = w
                        break
    
            if not target_workflow:
                return _format_error_response(f"Workflow '{workflow_identifier}' not found")
    
            # Calculate time range
            start_time = _calculate_time_window(time_window)
    
            # Initialize diagnosis result
            result: dict[str, Any] = {
                "workflow": {
                    "id": target_workflow.id,
                    "name": target_workflow.name,
                    "project": target_workflow.project.name,
                    "timezone": target_workflow.timezone,
                    "scheduled": target_workflow.schedule is not None,
                },
                "time_window": time_window,
                "diagnostic_level": diagnostic_level,
            }
    
            # Add schedule info if available
            if target_workflow.schedule:
                result["workflow"]["schedule"] = target_workflow.schedule
    
            # Analyze session history
            sessions = []
            total_duration = 0.0
            successful_runs = 0
    
            for session in target_workflow.latest_sessions:
                session_time = _parse_datetime(session.session_time)
                if start_time and session_time and session_time < start_time:
                    continue  # Skip sessions outside time window
    
                session_info = {
                    "session_time": session.session_time,
                    "status": session.last_attempt.status,
                    "success": session.last_attempt.success,
                }
    
                # Calculate duration if finished
                if session.last_attempt.created_at and session.last_attempt.finished_at:
                    created = _parse_datetime(session.last_attempt.created_at)
                    finished = _parse_datetime(session.last_attempt.finished_at)
                    if created and finished:
                        duration = (finished - created).total_seconds()
                        session_info["duration_seconds"] = duration
                        total_duration += duration
    
                sessions.append(session_info)
                if session.last_attempt.success:
                    successful_runs += 1
    
            # Calculate metrics
            total_runs = len(sessions)
            success_rate = successful_runs / total_runs if total_runs > 0 else 0
            avg_duration = total_duration / successful_runs if successful_runs > 0 else 0
    
            # Analyze failure patterns
            failure_patterns = _analyze_failure_patterns(sessions)
    
            # Identify issues
            issues = []
    
            # High failure rate
            if success_rate < 0.8:
                issues.append(
                    {
                        "severity": "high",
                        "category": "error_rate",
                        "description": (
                            f"High failure rate: "
                            f"{(1 - success_rate) * 100:.1f}% of runs failing"
                        ),
                        "affected_components": ["workflow_execution"],
                    }
                )
    
            # Resource timeout patterns
            timeout_issues = [
                p for p in failure_patterns if p["type"] == "resource_timeout"
            ]
            if timeout_issues and timeout_issues[0]["count"] > 3:
                issues.append(
                    {
                        "severity": "high",
                        "category": "resource_management",
                        "description": "Frequent resource timeouts detected",
                        "recommendation": (
                            "Consider increasing memory allocation or optimizing queries"
                        ),
                        "affected_components": ["resource_allocation"],
                    }
                )
    
            # Performance degradation (would need historical data for proper trend)
            if avg_duration > 3600:  # > 1 hour average
                issues.append(
                    {
                        "severity": "medium",
                        "category": "performance",
                        "description": (
                            f"Long average execution time: {avg_duration / 60:.1f} minutes"
                        ),
                        "recommendation": "Review query optimization opportunities",
                        "affected_components": ["query_performance"],
                    }
                )
    
            # Calculate health score
            health_score = _calculate_health_score(
                success_rate, avg_duration, failure_patterns, result["workflow"]
            )
    
            # Build result
            result["health_score"] = round(health_score, 1)
            result["issues"] = issues
            result["metrics"] = {
                "total_runs": total_runs,
                "successful_runs": successful_runs,
                "success_rate": round(success_rate, 3),
                "avg_duration_minutes": round(avg_duration / 60, 1)
                if avg_duration > 0
                else 0,
            }
    
            # Add trends (simplified without historical data)
            result["trends"] = {
                "success_rate_trend": "stable",  # Would need historical comparison
                "execution_time_trend": "stable",
                "resource_usage_trend": "unknown",
            }
    
            # Add failure analysis for comprehensive level
            if diagnostic_level == "comprehensive":
                result["failure_analysis"] = {
                    "patterns": failure_patterns,
                    "recent_failures": [s for s in sessions if not s["success"]][:10],
                }
    
                # Add optimization opportunities
                optimization_opportunities = []
    
                if target_workflow.schedule and avg_duration > 1800:  # > 30 min
                    optimization_opportunities.append(
                        {
                            "type": "scheduling",
                            "description": (
                                "Consider adjusting schedule to avoid peak hours"
                            ),
                            "potential_impact": "Reduce resource contention",
                        }
                    )
    
                if len(failure_patterns) > 0:
                    optimization_opportunities.append(
                        {
                            "type": "error_handling",
                            "description": "Implement retry logic for transient failures",
                            "potential_impact": (
                                f"Could recover {failure_patterns[0]['count']} failed runs"
                            ),
                        }
                    )
    
                result["optimization_opportunities"] = optimization_opportunities
    
            # Generate recommendations
            recommendations = _generate_recommendations(
                health_score, issues, result["workflow"]
            )
            result["recommendations"] = recommendations
    
            return result
    
        except Exception as e:
            return _format_error_response(f"Failed to diagnose workflow: {str(e)}")
  • Function that sets up globals and applies the MCP tool decorator to td_diagnose_workflow, effectively registering it as an available tool.
    def register_diagnostic_tools(mcp_instance, create_client_func, format_error_func):
        """Register diagnostic tools with the provided MCP instance."""
        global mcp, _create_client, _format_error_response
        mcp = mcp_instance
        _create_client = create_client_func
        _format_error_response = format_error_func
    
        # Register all tools
        mcp.tool()(td_diagnose_workflow)
        mcp.tool()(td_trace_data_lineage)
  • Invocation of the registration function during MCP server initialization, making the td_diagnose_workflow tool available.
    diagnostic_tools.register_diagnostic_tools(mcp, _create_client, _format_error_response)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/knishioka/td-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server