Skip to main content
Glama
knishioka

Treasure Data MCP Server

by knishioka

td_get_workflow

Retrieve workflow details by ID for debugging, project context, and execution status checks in Treasure Data.

Instructions

Get workflow details using numeric ID - essential for console URLs.

Direct workflow lookup when you have the ID. Handles large workflow IDs
that exceed pagination limits. Returns project info and execution history.

Common scenarios:
- Extracting ID from console URL (../workflows/12345678/info)
- Looking up workflow from error logs containing ID
- Getting project context for a known workflow ID
- Checking execution status by workflow ID

Returns workflow name, project details, schedule, and recent runs.
Includes console URL for quick browser access.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
workflow_idYes

Implementation Reference

  • Core handler function implementing td_get_workflow tool logic. Retrieves workflow details by numeric ID using direct API or fallback search through all workflows. Includes validation, error handling, project info, schedule details, recent sessions, and console URL.
    async def td_get_workflow(workflow_id: str) -> dict[str, Any]:
        """Get workflow details using numeric ID - essential for console URLs.
    
        Direct workflow lookup when you have the ID. Handles large workflow IDs
        that exceed pagination limits. Returns project info and execution history.
    
        Common scenarios:
        - Extracting ID from console URL (../workflows/12345678/info)
        - Looking up workflow from error logs containing ID
        - Getting project context for a known workflow ID
        - Checking execution status by workflow ID
    
        Returns workflow name, project details, schedule, and recent runs.
        Includes console URL for quick browser access.
        """
        if not workflow_id or not workflow_id.strip():
            return _format_error_response("Workflow ID cannot be empty")
    
        # Validate workflow ID format
        if not re.match(r"^\d+$", workflow_id):
            return _format_error_response("Invalid workflow ID format. Must be numeric.")
    
        client = _create_client(include_workflow=True)
        if isinstance(client, dict):
            return client
    
        try:
            # First try the direct API endpoint
            workflow = client.get_workflow_by_id(workflow_id)
    
            if workflow:
                # Found the workflow via direct API
                result: dict[str, Any] = {
                    "type": "workflow",
                    "workflow": {
                        "id": workflow.id,
                        "name": workflow.name,
                        "project": {
                            "id": workflow.project.id,
                            "name": workflow.project.name,
                        },
                        "timezone": workflow.timezone,
                        "scheduled": workflow.schedule is not None,
                    },
                }
    
                # Add schedule info if available
                if workflow.schedule:
                    result["workflow"]["schedule"] = workflow.schedule
    
                # Add latest session info if available
                # Note: Direct API might not include session info
                if workflow.latest_sessions:
                    latest_sessions = []
                    for session in workflow.latest_sessions[:5]:  # Last 5 sessions
                        latest_sessions.append(
                            {
                                "session_time": session.session_time,
                                "status": session.last_attempt.status,
                                "success": session.last_attempt.success,
                            }
                        )
                    result["workflow"]["latest_sessions"] = latest_sessions
    
                # Construct console URL
                result[
                    "console_url"
                ] = f"https://console.treasuredata.com/app/workflows/{workflow_id}/info"
    
                return result
    
            # If not found via direct API, fall back to searching through all workflows
            # This might be needed for workflows accessible via console API only
            workflows = client.get_workflows(count=1000, all_results=True)
    
            for workflow in workflows:
                if workflow.id == workflow_id:
                    # Found the workflow
                    result = {
                        "type": "workflow",
                        "workflow": {
                            "id": workflow.id,
                            "name": workflow.name,
                            "project": {
                                "id": workflow.project.id,
                                "name": workflow.project.name,
                            },
                            "timezone": workflow.timezone,
                            "scheduled": workflow.schedule is not None,
                        },
                    }
    
                    # Add schedule info if available
                    if workflow.schedule:
                        result["workflow"]["schedule"] = workflow.schedule
    
                    # Add latest session info if available
                    if workflow.latest_sessions:
                        latest_sessions = []
                        for session in workflow.latest_sessions[:5]:  # Last 5 sessions
                            latest_sessions.append(
                                {
                                    "session_time": session.session_time,
                                    "status": session.last_attempt.status,
                                    "success": session.last_attempt.success,
                                }
                            )
                        result["workflow"]["latest_sessions"] = latest_sessions
    
                    # Construct console URL
                    result[
                        "console_url"
                    ] = f"https://console.treasuredata.com/app/workflows/{workflow_id}/info"
    
                    return result
    
            return _format_error_response(f"Workflow with ID '{workflow_id}' not found")
    
        except Exception as e:
            return _format_error_response(f"Failed to get workflow: {str(e)}")
  • Registers the td_get_workflow tool by applying the mcp.tool() decorator inside the register_url_tools function.
    mcp.tool()(td_get_workflow)
  • Top-level registration call that invokes url_tools.register_url_tools(mcp, _create_client, _format_error_response), which in turn registers td_get_workflow and other URL tools.
    url_tools.register_url_tools(mcp, _create_client, _format_error_response)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/knishioka/td-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server