Skip to main content
Glama
knishioka

Treasure Data MCP Server

by knishioka

td_analyze_execution

Analyze workflow executions for debugging by providing console URLs or IDs. Identifies failures, slow tasks, and offers actionable recommendations to resolve execution issues.

Instructions

Analyze workflow execution from console URL or ID - best for debugging.

Smart analysis tool that accepts URLs from error alerts or IDs. Automatically
detects type and provides comprehensive execution analysis with recommendations.

Accepts:
- Console URLs (e.g., https://console.../app/sessions/123456)
- Session IDs (e.g., 123456789)
- Attempt IDs (e.g., 987654321)

Common scenarios:
- Someone shares a workflow URL in Slack during incident
- Quick analysis when you only have an ID from logs
- One-stop debugging for any execution issue

Returns analysis with failures, slow tasks, and actionable recommendations.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
url_or_idYes

Implementation Reference

  • The td_analyze_execution tool handler function. Analyzes a workflow execution given a console URL or session/attempt ID. Extracts IDs from URLs, fetches session or attempt details, retrieves tasks, identifies failures and long-running tasks, and generates recommendations.
    async def td_analyze_execution(url_or_id: str) -> dict[str, Any]:
        """Analyze workflow execution from console URL or ID - best for debugging.
    
        Smart analysis tool that accepts URLs from error alerts or IDs. Automatically
        detects type and provides comprehensive execution analysis with recommendations.
    
        Accepts:
        - Console URLs (e.g., https://console.../app/sessions/123456)
        - Session IDs (e.g., 123456789)
        - Attempt IDs (e.g., 987654321)
    
        Common scenarios:
        - Someone shares a workflow URL in Slack during incident
        - Quick analysis when you only have an ID from logs
        - One-stop debugging for any execution issue
    
        Returns analysis with failures, slow tasks, and actionable recommendations.
        """
        if not url_or_id or not url_or_id.strip():
            return _format_error_response("URL or ID cannot be empty")
    
        import re
    
        # Try to extract IDs from URLs
        session_id = None
        attempt_id = None
    
        # Check for session URL
        session_match = re.search(r"/sessions/(\d+)", url_or_id)
        if session_match:
            session_id = session_match.group(1)
        # Check for attempt URL
        attempt_match = re.search(r"/attempts/(\d+)", url_or_id)
        if attempt_match:
            attempt_id = attempt_match.group(1)
        # If no URL pattern, assume it's a direct ID
        elif url_or_id.isdigit():
            # Try to determine if it's a session or attempt by checking both
            session_id = url_or_id  # Will try session first
    
        client = _create_client(include_workflow=True)
        if isinstance(client, dict):
            return client
    
        try:
            analysis = {"input": url_or_id}
    
            # Try session first
            if session_id:
                session = client.get_session(session_id)
                if session:
                    analysis["type"] = "session"
                    analysis["session"] = {
                        "id": session.id,
                        "workflow": session.workflow["name"],
                        "project": session.project["name"],
                        "session_time": session.session_time,
                        "status": session.last_attempt.status,
                        "success": session.last_attempt.success,
                    }
    
                    # Get attempt details
                    attempt_id = session.last_attempt.id
                    analysis["latest_attempt_id"] = attempt_id
    
            # If not a session or if we have an attempt ID, try attempt
            if attempt_id and "type" not in analysis:
                attempt = client.get_attempt(attempt_id)
                if attempt:
                    analysis["type"] = "attempt"
                    analysis["attempt"] = {
                        "id": attempt.id,
                        "workflow": attempt.workflow["name"],
                        "project": attempt.project["name"],
                        "session_time": attempt.session_time,
                        "status": attempt.status,
                        "success": attempt.success,
                    }
                    attempt_id = attempt.id
    
            # If we found something, get tasks for detailed analysis
            if attempt_id:
                tasks = client.get_attempt_tasks(attempt_id)
    
                # Analyze task execution
                failed_tasks = []
                long_running_tasks = []
                task_tree = {}
    
                for task in tasks:
                    # Build task tree
                    if task.parent_id:
                        if task.parent_id not in task_tree:
                            task_tree[task.parent_id] = []
                        task_tree[task.parent_id].append(task.full_name)
    
                    # Track failures
                    if task.state in ["failed", "error"]:
                        failed_tasks.append(
                            {
                                "name": task.full_name,
                                "state": task.state,
                                "error": task.error if task.error else "No error details",
                            }
                        )
    
                    # Track long-running tasks
                    if task.started_at and task.updated_at:
                        try:
                            from datetime import datetime
    
                            started = datetime.fromisoformat(
                                task.started_at.replace("Z", "+00:00")
                            )
                            updated = datetime.fromisoformat(
                                task.updated_at.replace("Z", "+00:00")
                            )
                            duration = (updated - started).total_seconds()
                            if duration > 300:  # Tasks longer than 5 minutes
                                long_running_tasks.append(
                                    {
                                        "name": task.full_name,
                                        "duration_seconds": duration,
                                    }
                                )
                        except Exception:
                            pass
    
                analysis["task_analysis"] = {
                    "total_tasks": len(tasks),
                    "failed_tasks": failed_tasks,
                    "long_running_tasks": sorted(
                        long_running_tasks,
                        key=lambda x: x["duration_seconds"],
                        reverse=True,
                    )[:5],  # Top 5 longest
                }
    
                # Add recommendations
                recommendations = []
                if failed_tasks:
                    recommendations.append(
                        f"Found {len(failed_tasks)} failed task(s). "
                        "Check error details above."
                    )
                if long_running_tasks:
                    recommendations.append(
                        f"Found {len(long_running_tasks)} task(s) running longer than "
                        "5 minutes. Consider optimization."
                    )
                if not analysis.get("session", {}).get("success", True):
                    recommendations.append(
                        "Workflow failed. Use td_get_attempt_tasks for detailed "
                        "task breakdown."
                    )
    
                if recommendations:
                    analysis["recommendations"] = recommendations
    
                return analysis
            else:
                return _format_error_response(
                    f"Could not find session or attempt with identifier: {url_or_id}"
                )
    
        except Exception as e:
            return _format_error_response(f"Failed to analyze execution: {str(e)}")
  • Registers the td_analyze_execution tool using the mcp.tool() decorator within the register_execution_tools function.
    # Register all tools
    mcp.tool()(td_get_session)
    mcp.tool()(td_list_sessions)
    mcp.tool()(td_get_attempt)
    mcp.tool()(td_get_attempt_tasks)
    mcp.tool()(td_analyze_execution)
  • Invokes register_execution_tools from execution_tools module to register all execution tools, including td_analyze_execution, with the MCP server instance.
    # Register workflow execution tools
    execution_tools.register_execution_tools(mcp, _create_client, _format_error_response)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/knishioka/td-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server