Skip to main content
Glama
knishioka

Treasure Data MCP Server

by knishioka

td_trace_data_lineage

Trace data dependencies to analyze impact, identify sources, and map data flow between tables and projects in Treasure Data.

Instructions

Map data flow to see what feeds into or depends on tables/projects.

Critical for understanding data dependencies and impact analysis.
Traces through SQL queries to build dependency graph.

Common scenarios:
- "What happens if I change this table?" - Impact analysis
- "Where does this data come from?" - Source tracing
- Data quality issues - Track upstream problems
- Migration planning - Understand dependencies
- Documentation - Data flow diagrams

Directions:
- upstream: What tables/projects feed INTO this
- downstream: What tables/projects CONSUME this
- both: Complete dependency graph

Returns visual-ready dependency tree with table/project relationships.

Args:
    table_or_project: Table name (format: "database.table") or project name/ID
    direction: "upstream" (sources), "downstream" (consumers), or "both"
    max_depth: Maximum levels to trace (default: 3)

Returns:
    Data lineage graph with dependencies and data flow information

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
table_or_projectYes
directionNoboth
max_depthNo

Implementation Reference

  • The main handler function for the td_trace_data_lineage tool. It traces data lineage for a given table or project, building a dependency graph with nodes and edges. Supports upstream/downstream tracing (simplified implementation using metadata and heuristics). Includes input validation, client interaction, and structured output.
    async def td_trace_data_lineage(
        table_or_project: str,
        direction: str = "both",
        max_depth: int = 3,
    ) -> dict[str, Any]:
        """Map data flow to see what feeds into or depends on tables/projects.
    
        Critical for understanding data dependencies and impact analysis.
        Traces through SQL queries to build dependency graph.
    
        Common scenarios:
        - "What happens if I change this table?" - Impact analysis
        - "Where does this data come from?" - Source tracing
        - Data quality issues - Track upstream problems
        - Migration planning - Understand dependencies
        - Documentation - Data flow diagrams
    
        Directions:
        - upstream: What tables/projects feed INTO this
        - downstream: What tables/projects CONSUME this
        - both: Complete dependency graph
    
        Returns visual-ready dependency tree with table/project relationships.
    
        Args:
            table_or_project: Table name (format: "database.table") or project name/ID
            direction: "upstream" (sources), "downstream" (consumers), or "both"
            max_depth: Maximum levels to trace (default: 3)
    
        Returns:
            Data lineage graph with dependencies and data flow information
        """
        if not table_or_project or not table_or_project.strip():
            return _format_error_response("Table or project identifier cannot be empty")
    
        if direction not in ["upstream", "downstream", "both"]:
            return _format_error_response(
                "Direction must be 'upstream', 'downstream', or 'both'"
            )
    
        client = _create_client(include_workflow=True)
        if isinstance(client, dict):
            return client
    
        try:
            # Initialize result
            result = {
                "query": table_or_project,
                "direction": direction,
                "max_depth": max_depth,
                "lineage": {
                    "nodes": [],
                    "edges": [],
                },
                "summary": {},
            }
    
            # Determine if input is table or project
            is_table = "." in table_or_project
    
            if is_table:
                # Parse database.table format
                parts = table_or_project.split(".", 1)
                if len(parts) != 2:
                    return _format_error_response("Table must be in format: database.table")
    
                database_name, table_name = parts
    
                # Verify table exists
                try:
                    tables = client.get_tables(database_name, all_results=True)
                    table_exists = any(t.name == table_name for t in tables)
                    if not table_exists:
                        return _format_error_response(
                            f"Table '{table_or_project}' not found"
                        )
                except Exception:
                    return _format_error_response(f"Database '{database_name}' not found")
    
                # Add root node
                result["lineage"]["nodes"].append(
                    {
                        "id": table_or_project,
                        "type": "table",
                        "name": table_name,
                        "database": database_name,
                        "level": 0,
                    }
                )
    
                # Note: Full lineage tracing would require parsing SQL queries
                # from all workflows, which is beyond current scope
                result["summary"]["message"] = (
                    "Table lineage tracing requires SQL parsing from all workflows. "
                    "This is a simplified view based on available metadata."
                )
    
                # Search for workflows that might reference this table
                workflows = client.get_workflows(count=500, all_results=True)
                referencing_workflows = []
    
                for workflow in workflows:
                    # Simple heuristic: workflow name contains table name
                    if table_name.lower() in workflow.name.lower():
                        referencing_workflows.append(
                            {
                                "workflow_id": workflow.id,
                                "workflow_name": workflow.name,
                                "project": workflow.project.name,
                                "scheduled": workflow.schedule is not None,
                            }
                        )
    
                result["summary"]["referencing_workflows"] = referencing_workflows[:10]
                result["summary"]["total_references"] = len(referencing_workflows)
    
            else:
                # Project-based lineage
                # Find project
                project = None
                project_id = None
    
                if re.match(r"^\d+$", table_or_project):
                    project = client.get_project(table_or_project)
                    project_id = table_or_project
                else:
                    projects = client.get_projects(limit=200, all_results=True)
                    for p in projects:
                        if table_or_project.lower() in p.name.lower():
                            project = p
                            project_id = p.id
                            break
    
                if not project:
                    return _format_error_response(f"Project '{table_or_project}' not found")
    
                # Add root node
                result["lineage"]["nodes"].append(
                    {
                        "id": project_id,
                        "type": "project",
                        "name": project.name,
                        "level": 0,
                    }
                )
    
                # Get workflows for this project
                workflows = client.get_workflows(count=500, all_results=True)
                project_workflows = [w for w in workflows if w.project.id == project_id]
    
                # Create workflow nodes
                for workflow in project_workflows:
                    node_id = f"workflow_{workflow.id}"
                    result["lineage"]["nodes"].append(
                        {
                            "id": node_id,
                            "type": "workflow",
                            "name": workflow.name,
                            "scheduled": workflow.schedule is not None,
                            "level": 1,
                        }
                    )
    
                    # Add edge from project to workflow
                    result["lineage"]["edges"].append(
                        {
                            "from": project_id,
                            "to": node_id,
                            "type": "contains",
                        }
                    )
    
                result["summary"]["workflow_count"] = len(project_workflows)
                result["summary"]["scheduled_workflows"] = sum(
                    1 for w in project_workflows if w.schedule
                )
    
                # Note about limitations
                result["summary"]["note"] = (
                    "Full data lineage requires parsing SQL queries and workflow "
                    "definitions. This view shows project and workflow relationships."
                )
    
            return result
    
        except Exception as e:
            return _format_error_response(f"Failed to trace data lineage: {str(e)}")
  • Registers the td_trace_data_lineage function as an MCP tool using the mcp.tool() decorator.
    mcp.tool()(td_trace_data_lineage)
Behavior4/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries the full burden of behavioral disclosure. It effectively describes what the tool does (traces through SQL queries to build dependency graph), what it returns (visual-ready dependency tree), and its operational characteristics (default values for direction and max_depth). It doesn't mention performance characteristics like rate limits or authentication requirements, but provides substantial behavioral context.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is well-structured with clear sections (purpose, scenarios, directions, args, returns) and front-loaded with the core functionality. While somewhat lengthy, every section adds value. The 'Common scenarios' section could be more concise, but overall the structure enhances readability.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness4/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

For a 3-parameter tool with no annotations and no output schema, the description provides comprehensive context. It covers purpose, usage scenarios, parameter semantics, and return format. The main gap is the lack of output schema details, but the description adequately explains what the tool returns ('visual-ready dependency tree with table/project relationships').

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters5/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

With 0% schema description coverage, the description fully compensates by providing detailed parameter semantics. It explains table_or_project format ('database.table'), direction options and their meanings (upstream=sources, downstream=consumers, both=complete graph), and max_depth behavior ('Maximum levels to trace'). This adds significant value beyond the bare schema.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose with specific verbs ('Map data flow', 'Traces through SQL queries') and resources ('tables/projects'). It distinguishes this tool from siblings by focusing on data lineage/dependency analysis rather than general analysis, search, or listing operations found in other tools like td_analyze_execution or td_list_tables.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines5/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides explicit guidance on when to use this tool through 'Common scenarios' (impact analysis, source tracing, data quality issues, migration planning, documentation) and 'Directions' (upstream, downstream, both). It clearly defines the tool's specific use cases without being misleading about alternatives.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/knishioka/td-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server