Skip to main content
Glama
knishioka

Treasure Data MCP Server

by knishioka

td_trace_data_lineage

Trace data dependencies to analyze impact, identify sources, and map data flow between tables and projects in Treasure Data.

Instructions

Map data flow to see what feeds into or depends on tables/projects.

Critical for understanding data dependencies and impact analysis.
Traces through SQL queries to build dependency graph.

Common scenarios:
- "What happens if I change this table?" - Impact analysis
- "Where does this data come from?" - Source tracing
- Data quality issues - Track upstream problems
- Migration planning - Understand dependencies
- Documentation - Data flow diagrams

Directions:
- upstream: What tables/projects feed INTO this
- downstream: What tables/projects CONSUME this
- both: Complete dependency graph

Returns visual-ready dependency tree with table/project relationships.

Args:
    table_or_project: Table name (format: "database.table") or project name/ID
    direction: "upstream" (sources), "downstream" (consumers), or "both"
    max_depth: Maximum levels to trace (default: 3)

Returns:
    Data lineage graph with dependencies and data flow information

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
table_or_projectYes
directionNoboth
max_depthNo

Implementation Reference

  • The main handler function for the td_trace_data_lineage tool. It traces data lineage for a given table or project, building a dependency graph with nodes and edges. Supports upstream/downstream tracing (simplified implementation using metadata and heuristics). Includes input validation, client interaction, and structured output.
    async def td_trace_data_lineage(
        table_or_project: str,
        direction: str = "both",
        max_depth: int = 3,
    ) -> dict[str, Any]:
        """Map data flow to see what feeds into or depends on tables/projects.
    
        Critical for understanding data dependencies and impact analysis.
        Traces through SQL queries to build dependency graph.
    
        Common scenarios:
        - "What happens if I change this table?" - Impact analysis
        - "Where does this data come from?" - Source tracing
        - Data quality issues - Track upstream problems
        - Migration planning - Understand dependencies
        - Documentation - Data flow diagrams
    
        Directions:
        - upstream: What tables/projects feed INTO this
        - downstream: What tables/projects CONSUME this
        - both: Complete dependency graph
    
        Returns visual-ready dependency tree with table/project relationships.
    
        Args:
            table_or_project: Table name (format: "database.table") or project name/ID
            direction: "upstream" (sources), "downstream" (consumers), or "both"
            max_depth: Maximum levels to trace (default: 3)
    
        Returns:
            Data lineage graph with dependencies and data flow information
        """
        if not table_or_project or not table_or_project.strip():
            return _format_error_response("Table or project identifier cannot be empty")
    
        if direction not in ["upstream", "downstream", "both"]:
            return _format_error_response(
                "Direction must be 'upstream', 'downstream', or 'both'"
            )
    
        client = _create_client(include_workflow=True)
        if isinstance(client, dict):
            return client
    
        try:
            # Initialize result
            result = {
                "query": table_or_project,
                "direction": direction,
                "max_depth": max_depth,
                "lineage": {
                    "nodes": [],
                    "edges": [],
                },
                "summary": {},
            }
    
            # Determine if input is table or project
            is_table = "." in table_or_project
    
            if is_table:
                # Parse database.table format
                parts = table_or_project.split(".", 1)
                if len(parts) != 2:
                    return _format_error_response("Table must be in format: database.table")
    
                database_name, table_name = parts
    
                # Verify table exists
                try:
                    tables = client.get_tables(database_name, all_results=True)
                    table_exists = any(t.name == table_name for t in tables)
                    if not table_exists:
                        return _format_error_response(
                            f"Table '{table_or_project}' not found"
                        )
                except Exception:
                    return _format_error_response(f"Database '{database_name}' not found")
    
                # Add root node
                result["lineage"]["nodes"].append(
                    {
                        "id": table_or_project,
                        "type": "table",
                        "name": table_name,
                        "database": database_name,
                        "level": 0,
                    }
                )
    
                # Note: Full lineage tracing would require parsing SQL queries
                # from all workflows, which is beyond current scope
                result["summary"]["message"] = (
                    "Table lineage tracing requires SQL parsing from all workflows. "
                    "This is a simplified view based on available metadata."
                )
    
                # Search for workflows that might reference this table
                workflows = client.get_workflows(count=500, all_results=True)
                referencing_workflows = []
    
                for workflow in workflows:
                    # Simple heuristic: workflow name contains table name
                    if table_name.lower() in workflow.name.lower():
                        referencing_workflows.append(
                            {
                                "workflow_id": workflow.id,
                                "workflow_name": workflow.name,
                                "project": workflow.project.name,
                                "scheduled": workflow.schedule is not None,
                            }
                        )
    
                result["summary"]["referencing_workflows"] = referencing_workflows[:10]
                result["summary"]["total_references"] = len(referencing_workflows)
    
            else:
                # Project-based lineage
                # Find project
                project = None
                project_id = None
    
                if re.match(r"^\d+$", table_or_project):
                    project = client.get_project(table_or_project)
                    project_id = table_or_project
                else:
                    projects = client.get_projects(limit=200, all_results=True)
                    for p in projects:
                        if table_or_project.lower() in p.name.lower():
                            project = p
                            project_id = p.id
                            break
    
                if not project:
                    return _format_error_response(f"Project '{table_or_project}' not found")
    
                # Add root node
                result["lineage"]["nodes"].append(
                    {
                        "id": project_id,
                        "type": "project",
                        "name": project.name,
                        "level": 0,
                    }
                )
    
                # Get workflows for this project
                workflows = client.get_workflows(count=500, all_results=True)
                project_workflows = [w for w in workflows if w.project.id == project_id]
    
                # Create workflow nodes
                for workflow in project_workflows:
                    node_id = f"workflow_{workflow.id}"
                    result["lineage"]["nodes"].append(
                        {
                            "id": node_id,
                            "type": "workflow",
                            "name": workflow.name,
                            "scheduled": workflow.schedule is not None,
                            "level": 1,
                        }
                    )
    
                    # Add edge from project to workflow
                    result["lineage"]["edges"].append(
                        {
                            "from": project_id,
                            "to": node_id,
                            "type": "contains",
                        }
                    )
    
                result["summary"]["workflow_count"] = len(project_workflows)
                result["summary"]["scheduled_workflows"] = sum(
                    1 for w in project_workflows if w.schedule
                )
    
                # Note about limitations
                result["summary"]["note"] = (
                    "Full data lineage requires parsing SQL queries and workflow "
                    "definitions. This view shows project and workflow relationships."
                )
    
            return result
    
        except Exception as e:
            return _format_error_response(f"Failed to trace data lineage: {str(e)}")
  • Registers the td_trace_data_lineage function as an MCP tool using the mcp.tool() decorator.
    mcp.tool()(td_trace_data_lineage)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/knishioka/td-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server