td_trace_data_lineage
Trace data dependencies to analyze impact, identify sources, and map data flow between tables and projects in Treasure Data.
Instructions
Map data flow to see what feeds into or depends on tables/projects.
Critical for understanding data dependencies and impact analysis.
Traces through SQL queries to build dependency graph.
Common scenarios:
- "What happens if I change this table?" - Impact analysis
- "Where does this data come from?" - Source tracing
- Data quality issues - Track upstream problems
- Migration planning - Understand dependencies
- Documentation - Data flow diagrams
Directions:
- upstream: What tables/projects feed INTO this
- downstream: What tables/projects CONSUME this
- both: Complete dependency graph
Returns visual-ready dependency tree with table/project relationships.
Args:
table_or_project: Table name (format: "database.table") or project name/ID
direction: "upstream" (sources), "downstream" (consumers), or "both"
max_depth: Maximum levels to trace (default: 3)
Returns:
Data lineage graph with dependencies and data flow information
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| table_or_project | Yes | ||
| direction | No | both | |
| max_depth | No |
Implementation Reference
- The main handler function for the td_trace_data_lineage tool. It traces data lineage for a given table or project, building a dependency graph with nodes and edges. Supports upstream/downstream tracing (simplified implementation using metadata and heuristics). Includes input validation, client interaction, and structured output.async def td_trace_data_lineage( table_or_project: str, direction: str = "both", max_depth: int = 3, ) -> dict[str, Any]: """Map data flow to see what feeds into or depends on tables/projects. Critical for understanding data dependencies and impact analysis. Traces through SQL queries to build dependency graph. Common scenarios: - "What happens if I change this table?" - Impact analysis - "Where does this data come from?" - Source tracing - Data quality issues - Track upstream problems - Migration planning - Understand dependencies - Documentation - Data flow diagrams Directions: - upstream: What tables/projects feed INTO this - downstream: What tables/projects CONSUME this - both: Complete dependency graph Returns visual-ready dependency tree with table/project relationships. Args: table_or_project: Table name (format: "database.table") or project name/ID direction: "upstream" (sources), "downstream" (consumers), or "both" max_depth: Maximum levels to trace (default: 3) Returns: Data lineage graph with dependencies and data flow information """ if not table_or_project or not table_or_project.strip(): return _format_error_response("Table or project identifier cannot be empty") if direction not in ["upstream", "downstream", "both"]: return _format_error_response( "Direction must be 'upstream', 'downstream', or 'both'" ) client = _create_client(include_workflow=True) if isinstance(client, dict): return client try: # Initialize result result = { "query": table_or_project, "direction": direction, "max_depth": max_depth, "lineage": { "nodes": [], "edges": [], }, "summary": {}, } # Determine if input is table or project is_table = "." in table_or_project if is_table: # Parse database.table format parts = table_or_project.split(".", 1) if len(parts) != 2: return _format_error_response("Table must be in format: database.table") database_name, table_name = parts # Verify table exists try: tables = client.get_tables(database_name, all_results=True) table_exists = any(t.name == table_name for t in tables) if not table_exists: return _format_error_response( f"Table '{table_or_project}' not found" ) except Exception: return _format_error_response(f"Database '{database_name}' not found") # Add root node result["lineage"]["nodes"].append( { "id": table_or_project, "type": "table", "name": table_name, "database": database_name, "level": 0, } ) # Note: Full lineage tracing would require parsing SQL queries # from all workflows, which is beyond current scope result["summary"]["message"] = ( "Table lineage tracing requires SQL parsing from all workflows. " "This is a simplified view based on available metadata." ) # Search for workflows that might reference this table workflows = client.get_workflows(count=500, all_results=True) referencing_workflows = [] for workflow in workflows: # Simple heuristic: workflow name contains table name if table_name.lower() in workflow.name.lower(): referencing_workflows.append( { "workflow_id": workflow.id, "workflow_name": workflow.name, "project": workflow.project.name, "scheduled": workflow.schedule is not None, } ) result["summary"]["referencing_workflows"] = referencing_workflows[:10] result["summary"]["total_references"] = len(referencing_workflows) else: # Project-based lineage # Find project project = None project_id = None if re.match(r"^\d+$", table_or_project): project = client.get_project(table_or_project) project_id = table_or_project else: projects = client.get_projects(limit=200, all_results=True) for p in projects: if table_or_project.lower() in p.name.lower(): project = p project_id = p.id break if not project: return _format_error_response(f"Project '{table_or_project}' not found") # Add root node result["lineage"]["nodes"].append( { "id": project_id, "type": "project", "name": project.name, "level": 0, } ) # Get workflows for this project workflows = client.get_workflows(count=500, all_results=True) project_workflows = [w for w in workflows if w.project.id == project_id] # Create workflow nodes for workflow in project_workflows: node_id = f"workflow_{workflow.id}" result["lineage"]["nodes"].append( { "id": node_id, "type": "workflow", "name": workflow.name, "scheduled": workflow.schedule is not None, "level": 1, } ) # Add edge from project to workflow result["lineage"]["edges"].append( { "from": project_id, "to": node_id, "type": "contains", } ) result["summary"]["workflow_count"] = len(project_workflows) result["summary"]["scheduled_workflows"] = sum( 1 for w in project_workflows if w.schedule ) # Note about limitations result["summary"]["note"] = ( "Full data lineage requires parsing SQL queries and workflow " "definitions. This view shows project and workflow relationships." ) return result except Exception as e: return _format_error_response(f"Failed to trace data lineage: {str(e)}")
- td_mcp_server/diagnostic_tools.py:27-27 (registration)Registers the td_trace_data_lineage function as an MCP tool using the mcp.tool() decorator.mcp.tool()(td_trace_data_lineage)