Skip to main content
Glama

bq_extract_dependencies

Extract table and column dependencies from BigQuery SQL queries to identify data relationships and validate query structure.

Instructions

Extract table and column dependencies from BigQuery SQL

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
sqlYesThe SQL query to analyze
paramsNoOptional query parameters (key-value pairs)

Implementation Reference

  • Core implementation of dependency extraction using regex-based parsing to identify tables and columns from the SQL query.
    def extract_dependencies(self) -> dict[str, Any]: """Extract table and column dependencies from the SQL query. Returns: Dictionary containing: - tables: List of referenced tables with project/dataset/table info - columns: List of referenced columns - dependency_graph: Mapping of tables to their referenced columns """ tables = self._extract_tables() columns = self._extract_columns() # Build dependency graph dependency_graph = self._build_dependency_graph(tables, columns) return { "tables": tables, "columns": columns, "dependency_graph": dependency_graph, "table_count": len(tables), "column_count": len(columns), }
  • MCP tool wrapper function that instantiates SQLAnalyzer and delegates to its extract_dependencies method, with error handling.
    async def extract_dependencies(sql: str, params: dict[str, Any] | None = None) -> dict[str, Any]: """ Extract table and column dependencies from BigQuery SQL. Args: sql: The SQL query to analyze params: Optional query parameters (not used in static analysis) Returns: Dict with tables, columns, and dependency graph information """ try: analyzer = SQLAnalyzer(sql) result = analyzer.extract_dependencies() return result except Exception as e: return { "error": { "code": "ANALYSIS_ERROR", "message": f"Failed to extract dependencies: {str(e)}", } }
  • Input schema definition for the bq_extract_dependencies tool, specifying required 'sql' parameter and optional 'params'.
    types.Tool( name="bq_extract_dependencies", description=("Extract table and column dependencies from BigQuery SQL"), inputSchema={ "type": "object", "properties": { "sql": { "type": "string", "description": "The SQL query to analyze", }, "params": { "type": "object", "description": ("Optional query parameters (key-value pairs)"), "additionalProperties": True, }, }, "required": ["sql"], }, ),
  • Tool dispatch logic in the MCP call_tool handler that routes requests for 'bq_extract_dependencies' to the extract_dependencies function.
    elif name == "bq_extract_dependencies": result = await extract_dependencies(sql=arguments["sql"], params=arguments.get("params")) return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
  • Helper method that uses regex patterns to extract table references from FROM and JOIN clauses, supporting various qualification formats.
    def _extract_tables(self) -> list[dict[str, str]]: """Extract all referenced tables from the query.""" if self._tables_cache is not None: return self._tables_cache tables = [] # BigQuery table reference patterns # Format: [project.]dataset.table or `project.dataset.table` or just table_name patterns = [ # Fully qualified patterns ( r"FROM\s+`([a-zA-Z0-9_-]+)\.([a-zA-Z0-9_-]+)\.([a-zA-Z0-9_-]+)`", 3, ), # `project.dataset.table` (r"FROM\s+`([a-zA-Z0-9_-]+)\.([a-zA-Z0-9_-]+)`", 2), # `dataset.table` ( r"FROM\s+([a-zA-Z0-9_-]+)\.([a-zA-Z0-9_-]+)\.([a-zA-Z0-9_-]+)(?:\s|$)", 3, ), # project.dataset.table (r"FROM\s+([a-zA-Z0-9_-]+)\.([a-zA-Z0-9_-]+)(?:\s|$)", 2), # dataset.table ( r"JOIN\s+`([a-zA-Z0-9_-]+)\.([a-zA-Z0-9_-]+)\.([a-zA-Z0-9_-]+)`", 3, ), # `project.dataset.table` in JOIN (r"JOIN\s+`([a-zA-Z0-9_-]+)\.([a-zA-Z0-9_-]+)`", 2), # `dataset.table` in JOIN ( r"JOIN\s+([a-zA-Z0-9_-]+)\.([a-zA-Z0-9_-]+)\.([a-zA-Z0-9_-]+)(?:\s|$)", 3, ), # project.dataset.table in JOIN (r"JOIN\s+([a-zA-Z0-9_-]+)\.([a-zA-Z0-9_-]+)(?:\s|$)", 2), # dataset.table in JOIN # Simple table name patterns (for test compatibility) ( r"FROM\s+([a-zA-Z0-9_]+)(?:\s+[a-zA-Z0-9_]+)?(?:\s|$|,)", 1, ), # Simple table name with optional alias ( r"JOIN\s+([a-zA-Z0-9_]+)(?:\s+[a-zA-Z0-9_]+)?(?:\s+ON|\s|$)", 1, ), # Simple table name in JOIN ] for pattern, group_count in patterns: matches = re.finditer(pattern, self.sql, re.IGNORECASE) for match in matches: groups = match.groups() if group_count == 3: # project.dataset.table format tables.append( { "project": groups[0], "dataset": groups[1], "table": groups[2], "full_name": f"{groups[0]}.{groups[1]}.{groups[2]}", "name": f"{groups[0]}.{groups[1]}.{groups[2]}", } ) elif group_count == 2: # dataset.table format tables.append( { "project": None, "dataset": groups[0], "table": groups[1], "full_name": f"{groups[0]}.{groups[1]}", "name": f"{groups[0]}.{groups[1]}", } ) elif group_count == 1: # Simple table name table_name = groups[0] # Skip if it's an alias or keyword if table_name.upper() not in [ "AS", "ON", "WHERE", "AND", "OR", "LEFT", "RIGHT", "INNER", "FULL", "CROSS", ]: # Check if this table was already added with full qualification is_duplicate = any(t.get("table") == table_name for t in tables) if not is_duplicate: tables.append( { "project": None, "dataset": None, "table": table_name, "full_name": table_name, "name": table_name, } ) # Remove duplicates seen = set() unique_tables = [] for table in tables: if table["full_name"] not in seen: seen.add(table["full_name"]) unique_tables.append(table) self._tables_cache = unique_tables return unique_tables

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/caron14/mcp-bigquery'

If you have feedback or need assistance with the MCP directory API, please join our Discord server