Skip to main content
Glama
knishioka

Treasure Data MCP Server

by knishioka

td_diagnose_workflow

Diagnose workflow issues by analyzing execution history to identify failures, performance problems, and provide fix recommendations with health scores.

Instructions

Health check for workflows - find why they're failing or slow.

Automated troubleshooting that analyzes execution history to identify
patterns, calculate health scores, and provide fix recommendations.

Common scenarios:
- Workflow suddenly failing - Find root cause
- Performance degradation - Identify slow tasks
- Reliability issues - Pattern analysis
- Pre-deployment check - Ensure workflow health
- Incident response - Quick failure diagnosis

Time windows: "30d", "7d", "24h" for trend analysis
Levels: "basic" (quick stats), "comprehensive" (full analysis)

Returns health score (0-10), failure patterns, and prioritized fixes.

Args:
    workflow_identifier: Workflow name, ID, or partial match
    time_window: Time period to analyze (e.g., "30d", "7d", "24h")
    diagnostic_level: "basic" for quick check, "comprehensive" for deep analysis

Returns:
    Health report with score, issues, trends, and optimization recommendations

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
workflow_identifierYes
time_windowNo30d
diagnostic_levelNobasic

Implementation Reference

  • The core async handler function implementing td_diagnose_workflow. Analyzes Treasure Data workflow execution history within a specified time window, calculates success rates, durations, failure patterns, health scores (0-10), identifies issues, generates prioritized recommendations, and returns a structured diagnosis report with metrics, trends, and optimization suggestions.
    async def td_diagnose_workflow(
        workflow_identifier: str,
        time_window: str = "30d",
        diagnostic_level: str = "basic",
    ) -> dict[str, Any]:
        """Health check for workflows - find why they're failing or slow.
    
        Automated troubleshooting that analyzes execution history to identify
        patterns, calculate health scores, and provide fix recommendations.
    
        Common scenarios:
        - Workflow suddenly failing - Find root cause
        - Performance degradation - Identify slow tasks
        - Reliability issues - Pattern analysis
        - Pre-deployment check - Ensure workflow health
        - Incident response - Quick failure diagnosis
    
        Time windows: "30d", "7d", "24h" for trend analysis
        Levels: "basic" (quick stats), "comprehensive" (full analysis)
    
        Returns health score (0-10), failure patterns, and prioritized fixes.
    
        Args:
            workflow_identifier: Workflow name, ID, or partial match
            time_window: Time period to analyze (e.g., "30d", "7d", "24h")
            diagnostic_level: "basic" for quick check, "comprehensive" for deep analysis
    
        Returns:
            Health report with score, issues, trends, and optimization recommendations
        """
        if not workflow_identifier or not workflow_identifier.strip():
            return _format_error_response("Workflow identifier cannot be empty")
    
        client = _create_client(include_workflow=True)
        if isinstance(client, dict):
            return client
    
        try:
            # Find the workflow
            workflows = client.get_workflows(count=1000, all_results=True)
            target_workflow = None
    
            # Try exact ID match first
            if re.match(r"^\d+$", workflow_identifier):
                for w in workflows:
                    if w.id == workflow_identifier:
                        target_workflow = w
                        break
    
            # Try name match
            if not target_workflow:
                workflow_lower = workflow_identifier.lower()
                for w in workflows:
                    if workflow_lower in w.name.lower():
                        target_workflow = w
                        break
    
            if not target_workflow:
                return _format_error_response(f"Workflow '{workflow_identifier}' not found")
    
            # Calculate time range
            start_time = _calculate_time_window(time_window)
    
            # Initialize diagnosis result
            result: dict[str, Any] = {
                "workflow": {
                    "id": target_workflow.id,
                    "name": target_workflow.name,
                    "project": target_workflow.project.name,
                    "timezone": target_workflow.timezone,
                    "scheduled": target_workflow.schedule is not None,
                },
                "time_window": time_window,
                "diagnostic_level": diagnostic_level,
            }
    
            # Add schedule info if available
            if target_workflow.schedule:
                result["workflow"]["schedule"] = target_workflow.schedule
    
            # Analyze session history
            sessions = []
            total_duration = 0.0
            successful_runs = 0
    
            for session in target_workflow.latest_sessions:
                session_time = _parse_datetime(session.session_time)
                if start_time and session_time and session_time < start_time:
                    continue  # Skip sessions outside time window
    
                session_info = {
                    "session_time": session.session_time,
                    "status": session.last_attempt.status,
                    "success": session.last_attempt.success,
                }
    
                # Calculate duration if finished
                if session.last_attempt.created_at and session.last_attempt.finished_at:
                    created = _parse_datetime(session.last_attempt.created_at)
                    finished = _parse_datetime(session.last_attempt.finished_at)
                    if created and finished:
                        duration = (finished - created).total_seconds()
                        session_info["duration_seconds"] = duration
                        total_duration += duration
    
                sessions.append(session_info)
                if session.last_attempt.success:
                    successful_runs += 1
    
            # Calculate metrics
            total_runs = len(sessions)
            success_rate = successful_runs / total_runs if total_runs > 0 else 0
            avg_duration = total_duration / successful_runs if successful_runs > 0 else 0
    
            # Analyze failure patterns
            failure_patterns = _analyze_failure_patterns(sessions)
    
            # Identify issues
            issues = []
    
            # High failure rate
            if success_rate < 0.8:
                issues.append(
                    {
                        "severity": "high",
                        "category": "error_rate",
                        "description": (
                            f"High failure rate: "
                            f"{(1 - success_rate) * 100:.1f}% of runs failing"
                        ),
                        "affected_components": ["workflow_execution"],
                    }
                )
    
            # Resource timeout patterns
            timeout_issues = [
                p for p in failure_patterns if p["type"] == "resource_timeout"
            ]
            if timeout_issues and timeout_issues[0]["count"] > 3:
                issues.append(
                    {
                        "severity": "high",
                        "category": "resource_management",
                        "description": "Frequent resource timeouts detected",
                        "recommendation": (
                            "Consider increasing memory allocation or optimizing queries"
                        ),
                        "affected_components": ["resource_allocation"],
                    }
                )
    
            # Performance degradation (would need historical data for proper trend)
            if avg_duration > 3600:  # > 1 hour average
                issues.append(
                    {
                        "severity": "medium",
                        "category": "performance",
                        "description": (
                            f"Long average execution time: {avg_duration / 60:.1f} minutes"
                        ),
                        "recommendation": "Review query optimization opportunities",
                        "affected_components": ["query_performance"],
                    }
                )
    
            # Calculate health score
            health_score = _calculate_health_score(
                success_rate, avg_duration, failure_patterns, result["workflow"]
            )
    
            # Build result
            result["health_score"] = round(health_score, 1)
            result["issues"] = issues
            result["metrics"] = {
                "total_runs": total_runs,
                "successful_runs": successful_runs,
                "success_rate": round(success_rate, 3),
                "avg_duration_minutes": round(avg_duration / 60, 1)
                if avg_duration > 0
                else 0,
            }
    
            # Add trends (simplified without historical data)
            result["trends"] = {
                "success_rate_trend": "stable",  # Would need historical comparison
                "execution_time_trend": "stable",
                "resource_usage_trend": "unknown",
            }
    
            # Add failure analysis for comprehensive level
            if diagnostic_level == "comprehensive":
                result["failure_analysis"] = {
                    "patterns": failure_patterns,
                    "recent_failures": [s for s in sessions if not s["success"]][:10],
                }
    
                # Add optimization opportunities
                optimization_opportunities = []
    
                if target_workflow.schedule and avg_duration > 1800:  # > 30 min
                    optimization_opportunities.append(
                        {
                            "type": "scheduling",
                            "description": (
                                "Consider adjusting schedule to avoid peak hours"
                            ),
                            "potential_impact": "Reduce resource contention",
                        }
                    )
    
                if len(failure_patterns) > 0:
                    optimization_opportunities.append(
                        {
                            "type": "error_handling",
                            "description": "Implement retry logic for transient failures",
                            "potential_impact": (
                                f"Could recover {failure_patterns[0]['count']} failed runs"
                            ),
                        }
                    )
    
                result["optimization_opportunities"] = optimization_opportunities
    
            # Generate recommendations
            recommendations = _generate_recommendations(
                health_score, issues, result["workflow"]
            )
            result["recommendations"] = recommendations
    
            return result
    
        except Exception as e:
            return _format_error_response(f"Failed to diagnose workflow: {str(e)}")
  • Function that sets up globals and applies the MCP tool decorator to td_diagnose_workflow, effectively registering it as an available tool.
    def register_diagnostic_tools(mcp_instance, create_client_func, format_error_func):
        """Register diagnostic tools with the provided MCP instance."""
        global mcp, _create_client, _format_error_response
        mcp = mcp_instance
        _create_client = create_client_func
        _format_error_response = format_error_func
    
        # Register all tools
        mcp.tool()(td_diagnose_workflow)
        mcp.tool()(td_trace_data_lineage)
  • Invocation of the registration function during MCP server initialization, making the td_diagnose_workflow tool available.
    diagnostic_tools.register_diagnostic_tools(mcp, _create_client, _format_error_response)
Behavior3/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries the full burden of behavioral disclosure. It describes the tool's purpose and output (health score, failure patterns, fixes) but lacks details on permissions, rate limits, or side effects. It adds some context like automated troubleshooting and trend analysis, but does not fully cover behavioral traits such as execution time or error handling.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is well-structured with a clear purpose statement, usage scenarios, parameter explanations, and return details. It is front-loaded with key information and uses bullet points for scenarios, making it efficient. However, some redundancy exists (e.g., repeating parameter info in 'Args' section), slightly reducing conciseness.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness3/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool's complexity (diagnostic analysis with 3 parameters), no annotations, and no output schema, the description is moderately complete. It covers purpose, usage, parameters, and returns, but lacks details on output structure, error cases, or integration with sibling tools. For a diagnostic tool, more behavioral and output specifics would enhance completeness.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters4/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 0%, so the description must compensate. It explains each parameter's semantics: workflow_identifier accepts 'name, ID, or partial match,' time_window specifies periods like '30d,' and diagnostic_level defines 'basic' vs. 'comprehensive.' This adds meaningful context beyond the schema's basic titles, though it could detail format constraints more explicitly.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool performs health checks for workflows to diagnose failures or slowness, using specific verbs like 'analyzes execution history,' 'identify patterns,' and 'calculate health scores.' It distinguishes itself from siblings like td_get_workflow or td_list_workflows by focusing on diagnostic analysis rather than retrieval or listing.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines4/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides explicit usage scenarios (e.g., 'Workflow suddenly failing,' 'Performance degradation') and mentions time windows and diagnostic levels, offering clear context for when to use it. However, it does not explicitly state when not to use it or name specific alternatives among siblings, such as td_analyze_execution, which might overlap in functionality.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/knishioka/td-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server