Skip to main content
Glama
kql_validator.py10.4 kB
""" KQL Query Validator Pre-execution validation to catch errors before sending queries to Kusto. Validates column names, table names, and operator syntax against schema. Author: Arjun Trivedi Email: arjuntrivedi42@yahoo.com """ import re import logging from typing import Dict, List, Any logger = logging.getLogger(__name__) class KQLValidator: """Validates KQL queries against schema before execution.""" def __init__(self, memory_manager, schema_manager): """ Initialize validator with memory and schema managers. Args: memory_manager: MemoryManager instance for schema lookup schema_manager: SchemaManager instance for live schema discovery """ self.memory = memory_manager self.schema_manager = schema_manager async def validate_query( self, query: str, cluster: str, database: str, auto_discover: bool = True ) -> Dict[str, Any]: """ Validate KQL query against schema. Args: query: KQL query to validate cluster: Cluster URL database: Database name auto_discover: Whether to auto-discover missing schemas Returns: Dict with validation results: { "valid": bool, "errors": List[str], "warnings": List[str], "suggestions": List[str], "tables_used": List[str], "columns_validated": int } """ errors = [] warnings = [] suggestions = [] try: # Extract tables from query tables = self._extract_tables(query) logger.debug("Extracted tables from query: %s", tables) if not tables: warnings.append("No tables detected in query") return { "valid": True, "errors": [], "warnings": warnings, "suggestions": ["Ensure your query references at least one table"], "tables_used": [], "columns_validated": 0 } # Ensure schemas exist for all tables if auto_discover: await self._ensure_schemas_exist(cluster, database, tables) # Validate each table and its columns columns_validated = 0 for table in tables: table_errors, table_warnings, validated_count = await self._validate_table_usage( query, table, cluster, database ) errors.extend(table_errors) warnings.extend(table_warnings) columns_validated += validated_count # Validate operator syntax syntax_errors = self._validate_operator_syntax(query) errors.extend(syntax_errors) # Generate suggestions based on errors if errors: suggestions = self._generate_suggestions(errors, tables) return { "valid": len(errors) == 0, "errors": errors, "warnings": warnings, "suggestions": suggestions, "tables_used": tables, "columns_validated": columns_validated } except Exception as e: logger.error("Validation error: %s", e, exc_info=True) return { "valid": False, "errors": [f"Validation failed: {str(e)}"], "warnings": [], "suggestions": ["Check query syntax and try again"], "tables_used": [], "columns_validated": 0 } def _extract_tables(self, query: str) -> List[str]: """Extract table names from KQL query.""" tables = set() # Pattern 1: Table at start or after pipe # Matches: TableName | ..., | TableName, etc. pattern1 = r'(?:^|\|\s*)([A-Za-z_][\w]*)\s*(?:\||$)' # Pattern 2: Join/union operations # Matches: join Table, union Table, lookup Table pattern2 = r'(?:join|union|lookup)\s+(?:kind\s*=\s*\w+\s+)?([A-Za-z_][\w]*)' # Pattern 3: Bracketed table names # Matches: ['TableName'], ["TableName"] pattern3 = r'\[[\'"]([\ w\s-]+)[\'"]\]' for pattern in [pattern1, pattern2, pattern3]: matches = re.finditer(pattern, query, re.IGNORECASE) for match in matches: table_name = match.group(1).strip() # Filter out KQL keywords if not self._is_kql_keyword(table_name): tables.add(table_name) return list(tables) def _is_kql_keyword(self, word: str) -> bool: """Check if word is a KQL keyword.""" keywords = { 'where', 'project', 'extend', 'summarize', 'take', 'limit', 'sort', 'order', 'top', 'count', 'distinct', 'join', 'union', 'let', 'as', 'by', 'on', 'and', 'or', 'not', 'in', 'has', 'contains', 'startswith', 'endswith', 'between', 'ago', 'now', 'datetime', 'timespan', 'render', 'evaluate', 'parse', 'mv-expand' } return word.lower() in keywords async def _ensure_schemas_exist( self, cluster: str, database: str, tables: List[str] ) -> None: """Ensure schemas exist for all tables, trigger discovery if needed.""" for table in tables: try: schemas = self.memory._get_database_schema(cluster, database) schema = next((s for s in schemas if s.get("table") == table), None) if not schema or not schema.get("columns"): logger.info("Auto-discovering schema for %s", table) await self.schema_manager.get_table_schema( cluster=cluster, database=database, table=table, _force_refresh=False ) except Exception as e: logger.warning("Schema discovery failed for %s: %s", table, e) async def _validate_table_usage( self, query: str, table: str, cluster: str, database: str ) -> tuple[List[str], List[str], int]: """ Validate column usage for a specific table. Returns: (errors, warnings, validated_count) """ errors: List[str] = [] warnings = [] validated_count = 0 try: # Get schema for table schemas = self.memory._get_database_schema(cluster, database) schema = next((s for s in schemas if s.get("table") == table), None) if not schema: warnings.append(f"Schema not found for table '{table}'") return errors, warnings, 0 columns = schema.get("columns", {}) if not columns: errors.append(f"No columns found in schema for '{table}'. Cannot validate query.") return errors, warnings, 0 # Extract column references for this table column_refs = self._extract_column_references(query, table) # Validate each column reference for col_ref in column_refs: if col_ref not in columns: errors.append( f"Column '{col_ref}' not found in table '{table}'. " f"Available columns: {', '.join(sorted(columns.keys()))}" ) else: validated_count += 1 return errors, warnings, validated_count except Exception as e: logger.warning("Table validation failed for %s: %s", table, e) return errors, warnings, 0 def _extract_column_references(self, query: str, _table: str) -> List[str]: """Extract column references from query.""" columns = set() # Pattern for column references in common operators # Matches: where Column, project Column, extend Column, summarize by Column patterns = [ r'(?:where|project|extend|summarize|by)\s+([A-Za-z_][\w]*)', r'([A-Za-z_][\w]*)\s*(?:==|!=|<|>|<=|>=|contains|has|startswith)', ] for pattern in patterns: matches = re.finditer(pattern, query, re.IGNORECASE) for match in matches: col_name = match.group(1).strip() if not self._is_kql_keyword(col_name): columns.add(col_name) return list(columns) def _validate_operator_syntax(self, query: str) -> List[str]: """Validate KQL operator syntax.""" errors = [] # Check for invalid negation operators with spaces invalid_patterns = [ (r'!\s+=', "Invalid negation '! ='. Use '!=' without space"), (r'!\s+contains', "Invalid negation '! contains'. Use '!contains' without space"), (r'!\s+in\b', "Invalid negation '! in'. Use '!in' without space"), (r'!\s+has\b', "Invalid negation '! has'. Use '!has' without space"), (r'!has_any', "Invalid operator '!has_any'. Use '!in' instead"), ] for pattern, error_msg in invalid_patterns: if re.search(pattern, query, re.IGNORECASE): errors.append(error_msg) return errors def _generate_suggestions(self, errors: List[str], tables: List[str]) -> List[str]: """Generate helpful suggestions based on errors.""" suggestions = [] if any("not found in table" in err for err in errors): suggestions.append("Check column names against the schema") suggestions.append("Use schema discovery to refresh table schemas") if any("negation" in err.lower() for err in errors): suggestions.append("Remove spaces in negation operators (use !=, !contains, !in, !has)") if any("!has_any" in err for err in errors): suggestions.append("Replace !has_any with !in operator") if not suggestions: suggestions.append(f"Review the query syntax for tables: {', '.join(tables)}") return suggestions

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/4R9UN/mcp-kql-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server