Skip to main content
Glama

MCP KQL Server

memory.py138 kB
""" Unified Schema Memory System for MCP KQL Server This module provides a unified schema memory system that: - Uses AI-friendly special tokens (e.g., @@CLUSTER@@, ##DATABASE##) - Supports a two-step flow: execute KQL first, then discover/update schema context - Prevents context size bloat via intelligent compression - Provides schema memory used for AI-driven query generation Author: Arjun Trivedi Email: arjuntrivedi42@yahoo.com """ import json import logging import os import re import threading from datetime import datetime from functools import lru_cache from pathlib import Path from typing import Any, Dict, List, Optional, Union, Set, Tuple from dataclasses import dataclass # FastMCP imports removed - using programmatic description generation instead logger = logging.getLogger(__name__) @dataclass class ValidationResult: """Result of schema validation""" is_valid: bool errors: List[str] warnings: List[str] suggestions: List[str] validated_query: str tables_used: Set[str] columns_used: Dict[str, Set[str]] # table -> columns # Global lock for thread-safe memory operations _memory_lock = threading.RLock() # FastMCP initialization removed - using programmatic description generation # Enhanced AI-Friendly Special Tokens with XML-style structure SPECIAL_TOKENS = { "CLUSTER_START": "<CLUSTER>", "CLUSTER_END": "</CLUSTER>", "DATABASE_START": "<DB>", "DATABASE_END": "</DB>", "TABLE_START": "<TABLE>", "TABLE_END": "</TABLE>", "COLUMN_START": "<COL>", "COLUMN_END": "</COL>", "TYPE_START": "<TYPE>", "TYPE_END": "</TYPE>", "DESCRIPTION_START": "<DESC>", "DESCRIPTION_END": "</DESC>", "SUMMARY_START": "<SUMMARY>", "SUMMARY_END": "</SUMMARY>", "TAGS_START": "<TAGS>", "TAGS_END": "</TAGS>", "SAMPLES_START": "<SAMPLES>", "SAMPLES_END": "</SAMPLES>", "QUERY_START": "<QUERY>", "QUERY_END": "</QUERY>", } class ContextSelector: """ Intelligent context selection for query generation. Implements the enhanced schema context management recommended in the analysis. """ def select_relevant_context(self, query: str, all_schemas: Dict) -> List[str]: """Select only relevant schema context using intelligent scoring.""" # Parse query intent intent = self._parse_query_intent(query) # Score each schema for relevance scored_schemas = [] for table, schema in all_schemas.items(): score = self._calculate_relevance_score(intent, schema, table) scored_schemas.append((score, table, schema)) # Select top relevant schemas within token limit selected = [] token_count = 0 max_tokens = 4000 for score, table, schema in sorted(scored_schemas, reverse=True): schema_token = self._create_compact_token(schema, table) if token_count + len(schema_token) <= max_tokens: selected.append(schema_token) token_count += len(schema_token) else: break return selected def _parse_query_intent(self, query: str) -> Dict[str, Any]: """Parse query to understand user intent.""" intent = { "operation_types": [], "mentioned_tables": [], "mentioned_columns": [], "temporal_focus": False, "aggregation_focus": False, "filtering_focus": False } if not query: return intent query_lower = query.lower() # Detect operation types if any(op in query_lower for op in ['summarize', 'count', 'sum', 'avg']): intent["aggregation_focus"] = True intent["operation_types"].append("aggregation") if any(op in query_lower for op in ['where', 'filter']): intent["filtering_focus"] = True intent["operation_types"].append("filtering") if any(op in query_lower for op in ['ago(', 'between', 'timespan']): intent["temporal_focus"] = True intent["operation_types"].append("temporal") # Extract mentioned entities (simplified) words = query.split() for word in words: if word.isalpha() and len(word) > 3: intent["mentioned_columns"].append(word) return intent def _calculate_relevance_score(self, intent: Dict[str, Any], schema: Dict[str, Any], table: str) -> float: """Calculate relevance score for a schema based on query intent.""" score = 0.0 # Base score for having columns columns = schema.get("columns", {}) if columns: score += 1.0 # Score based on column types matching intent if intent["temporal_focus"]: temporal_columns = [ col for col, info in columns.items() if "datetime" in info.get("data_type", "").lower() ] score += len(temporal_columns) * 0.5 if intent["aggregation_focus"]: numeric_columns = [ col for col, info in columns.items() if any(t in info.get("data_type", "").lower() for t in ['int', 'real', 'decimal']) ] score += len(numeric_columns) * 0.3 # Score based on table name relevance table_lower = table.lower() for mentioned in intent["mentioned_columns"]: if mentioned.lower() in table_lower: score += 2.0 break # Score based on recent usage (if schema has successful queries) if schema.get("successful_queries"): score += 1.0 return score def _create_compact_token(self, schema: Dict[str, Any], table: str) -> str: """Create compact schema token for context.""" # Use existing ai_token if available, otherwise create minimal one if schema.get("ai_token"): return schema["ai_token"] # Create minimal token columns = schema.get("columns", {}) column_count = len(columns) # Create compact representation compact_columns = [] for col_name, col_info in list(columns.items())[:5]: # Max 5 columns col_type = col_info.get("data_type", "unknown") compact_columns.append(f"{col_name}({col_type})") token = ( f"{SPECIAL_TOKENS['TABLE_START']}{table}{SPECIAL_TOKENS['TABLE_END']}" f"{SPECIAL_TOKENS['SUMMARY_START']}{column_count}_cols{SPECIAL_TOKENS['SUMMARY_END']}" f"[{','.join(compact_columns)}]" ) return token class MemoryManager: """ Simplified Memory Manager implementing 2-step flow: 1. Execute KQL query first and show data 2. Background schema discovery and AI context preparation """ def __init__(self, custom_memory_path: Optional[str] = None): """Initialize memory manager with AI-friendly token system and thread safety.""" self.memory_path = self._get_memory_path(custom_memory_path) self.memory_path.parent.mkdir(parents=True, exist_ok=True) self.corpus = self._load_or_create_corpus() self._save_scheduled = False self._memory_size_limit = 500 * 1024 # 500KB limit per cluster self._compression_enabled = True def _get_memory_path(self, custom_path: Optional[str] = None) -> Path: """Get the path for unified schema memory.""" if custom_path: base_dir = Path(custom_path) elif os.name == "nt": # Windows appdata = os.environ.get("APPDATA", "") if not appdata: # Fallback to user profile if APPDATA not available base_dir = Path.home() / "AppData" / "Roaming" / "KQL_MCP" else: base_dir = Path(appdata) / "KQL_MCP" else: # macOS/Linux base_dir = Path.home() / ".local" / "share" / "KQL_MCP" # Log the memory path for debugging memory_path = base_dir / "unified_memory.json" logger.info(f"Schema memory path: {memory_path}") return memory_path def _load_or_create_corpus(self) -> Dict[str, Any]: """Load existing corpus or create a new one if loading fails or file doesn't exist.""" if self.memory_path.exists(): try: with open(self.memory_path, "r", encoding="utf-8") as f: corpus = json.load(f) logger.info(f"Loaded memory from {self.memory_path}") return self._ensure_corpus_structure(corpus) except Exception as e: logger.error(f"Failed to load memory from {self.memory_path}: {e}. A new corpus will be created.") # This block runs if the file doesn't exist or if loading failed. logger.info("Creating new schema memory corpus.") corpus = self._create_empty_corpus() # Temporarily assign to self.corpus so save_corpus() can access it. # The final assignment happens in __init__ after this function returns. self.corpus = corpus try: self.save_corpus() logger.info(f"Successfully created and saved new corpus at {self.memory_path}") except Exception as e: # Log error but proceed, as the corpus is in memory. logger.error(f"Failed to perform initial save of new corpus: {e}") return corpus def _create_empty_corpus(self) -> Dict[str, Any]: """Create empty corpus structure with enhanced AI-friendly organization.""" return { "version": "3.0", "created": datetime.now().isoformat(), "last_updated": datetime.now().isoformat(), "clusters": {}, # cluster_uri -> meta + databases -> tables -> schema + successful_queries } def _ensure_corpus_structure(self, corpus: Dict[str, Any]) -> Dict[str, Any]: """Ensure corpus has required structure and migrate if needed.""" required_sections = { "version": "3.0", "clusters": {}, } for section, default in required_sections.items(): if section not in corpus: corpus[section] = default # Handle version migration current_version = corpus.get("version", "3.0") if current_version in ["3.0", "2.0", "1.0"]: corpus = self._migrate_to_v31(corpus) # Update version and timestamp corpus["version"] = "3.0" corpus["last_updated"] = datetime.now().isoformat() return corpus def _migrate_to_v31(self, corpus: Dict[str, Any]) -> Dict[str, Any]: """Migrate corpus from older versions to v3.1 with meta sections.""" try: migrated_clusters = {} for cluster_uri, cluster_data in corpus.get("clusters", {}).items(): # Ensure cluster has meta section if isinstance(cluster_data, dict): if "meta" not in cluster_data: cluster_data["meta"] = { "token": f"{SPECIAL_TOKENS['CLUSTER_START']}{self._extract_cluster_name(cluster_uri)}{SPECIAL_TOKENS['CLUSTER_END']}", "description": f"Cluster {cluster_uri}", "last_accessed": datetime.now().isoformat() } # Migrate databases databases = cluster_data.get("databases", {}) migrated_databases = {} for db_name, db_data in databases.items(): if isinstance(db_data, dict): # Ensure database has meta section if "meta" not in db_data: tables_count = len(db_data.get("tables", {})) db_data["meta"] = { "token": f"{SPECIAL_TOKENS['DATABASE_START']}{db_name}{SPECIAL_TOKENS['DATABASE_END']}", "description": f"Database {db_name}", "table_count": tables_count } # Migrate tables to new structure tables = db_data.get("tables", {}) migrated_tables = {} for table_name, table_data in tables.items(): if isinstance(table_data, dict): # Convert old schema format to new structure if "meta" not in table_data: table_data["meta"] = { "token": f"{SPECIAL_TOKENS['TABLE_START']}{table_name}{SPECIAL_TOKENS['TABLE_END']}", "summary": f"{SPECIAL_TOKENS['SUMMARY_START']}{self._generate_table_summary(table_name, {})}{SPECIAL_TOKENS['SUMMARY_END']}", "discovered_at": table_data.get("discovered_at", datetime.now().isoformat()), "last_updated": table_data.get("last_updated", datetime.now().isoformat()) } # Ensure successful_queries exists table_data.setdefault("successful_queries", []) migrated_tables[table_name] = table_data db_data["tables"] = migrated_tables migrated_databases[db_name] = db_data cluster_data["databases"] = migrated_databases migrated_clusters[cluster_uri] = cluster_data corpus["clusters"] = migrated_clusters logger.info("Successfully migrated corpus to v3.1 structure") except Exception as e: logger.warning(f"Migration to v3.1 failed: {e}, using current structure") return corpus @lru_cache(maxsize=50) def get_schema(self, cluster_uri: str, database: str, table: str, enable_fallback: bool = True) -> Dict[str, Any]: """ Get schema for a specific table using the new structure with fallback strategies. Args: cluster_uri: The cluster URI database: Database name table: Table name enable_fallback: Whether to enable fallback strategies if primary schema fails Returns: Schema dictionary with fallback strategies applied if needed """ try: normalized_cluster = self._normalize_cluster_uri(cluster_uri) # Navigate the new structure cluster_data = self.corpus.get("clusters", {}).get(normalized_cluster, {}) db_data = cluster_data.get("databases", {}).get(database, {}) table_data = db_data.get("tables", {}).get(table, {}) # Extract schema from the new structure schema_data = table_data.get("schema", {}) # Use unified schema.columns format only - no redundant column_types storage # This eliminates duplicate schema storage and ensures single source of truth # If we have a valid schema, return it if schema_data and "columns" in schema_data: return schema_data # Apply fallback strategies if enabled and no valid schema found if enable_fallback: return self._apply_schema_fallback_strategies(normalized_cluster, database, table) return schema_data except Exception as e: logger.warning( f"Failed to get schema for {cluster_uri}/{database}/{table}: {e}" ) # Apply fallback strategies on exception if enable_fallback: try: return self._apply_schema_fallback_strategies(cluster_uri, database, table) except Exception as fallback_error: logger.error(f"All schema fallback strategies failed: {fallback_error}") return {} async def get_schema_async(self, cluster_uri: str, database: str, table: str, enable_fallback: bool = True) -> Dict[str, Any]: """ Async wrapper around the sync `get_schema` to allow non-blocking calls from async code. Uses the existing lru_cache-backed get_schema for fast lookups. """ import asyncio loop = asyncio.get_running_loop() return await loop.run_in_executor(None, self.get_schema, cluster_uri, database, table, enable_fallback) def store_schema( self, cluster_uri: str, database: str, table: str, schema_data: Dict[str, Any], samples: Optional[Dict[str, Any]] = None ): """Store schema with the new AI-optimized structure and thread safety.""" self._lock = _memory_lock self._lock.acquire() try: # DEBUG: Log incoming schema shape to help diagnose missing schema storage try: schema_type = type(schema_data).__name__ schema_keys = list(schema_data.keys()) if isinstance(schema_data, dict) else None except Exception: schema_type = str(type(schema_data)) schema_keys = None logger.debug( "store_schema called for %s/%s/%s - schema_type=%s, schema_keys=%s, samples_present=%s", cluster_uri, database, table, schema_type, schema_keys, bool(samples), ) normalized_cluster = self._normalize_cluster_uri(cluster_uri) # Apply dynamic compression before storing if self._compression_enabled: schema_data = self._compress_schema_data(schema_data) # Minimal pre-filter: only skip obviously invalid data, avoid rejecting valid schemas try: # Only skip if ALL columns are literally "Error" (case-insensitive) - very conservative check cols_check: List[str] = [] if isinstance(schema_data, dict): if isinstance(schema_data.get("columns"), list) and schema_data.get("columns"): cols_check = [ c if isinstance(c, str) else (c.get("name") or c.get("column") or "") for c in schema_data.get("columns", []) ] elif isinstance(schema_data.get("column_types"), dict) and schema_data.get("column_types"): cols_check = list(schema_data.get("column_types").keys()) # Only skip if EXACTLY one column named exactly "Error" with no other data if (cols_check and len(cols_check) == 1 and str(cols_check[0]).strip().lower() == "error" and not any(schema_data.get(k) for k in ["table_name", "discovered_at", "cluster", "database"])): logger.debug(f"Skipping schema store for {cluster_uri}/{database}/{table}: detected single 'Error' column with no metadata") return # Very conservative sample checking - only skip obvious error messages in samples if isinstance(samples, dict): error_sample_count = 0 total_samples = 0 for v in samples.values(): if v is not None: total_samples += 1 try: if isinstance(v, str) and re.search( r"^(kusto service error|semantic error|failed to execute kql):", str(v).strip(), flags=re.IGNORECASE, ): error_sample_count += 1 except Exception: continue # Only skip if ALL samples are error messages (and we have samples) if total_samples > 0 and error_sample_count == total_samples: logger.debug(f"Skipping schema store for {cluster_uri}/{database}/{table}: all samples are error messages") return except Exception as _preerr: logger.debug(f"Schema store pre-filter check failed: {_preerr}") # Ensure cluster structure if normalized_cluster not in self.corpus["clusters"]: self.corpus["clusters"][normalized_cluster] = { "meta": { "token": f"{SPECIAL_TOKENS['CLUSTER_START']}{self._extract_cluster_name(normalized_cluster)}{SPECIAL_TOKENS['CLUSTER_END']}", "description": f"Cluster {normalized_cluster}", "last_accessed": datetime.now().isoformat() }, "databases": {} } cluster_data = self.corpus["clusters"][normalized_cluster] # Ensure database structure if database not in cluster_data["databases"]: cluster_data["databases"][database] = { "meta": { "token": f"{SPECIAL_TOKENS['DATABASE_START']}{database}{SPECIAL_TOKENS['DATABASE_END']}", "description": f"Database {database}", "table_count": 0 }, "tables": {} } db_data = cluster_data["databases"][database] # Process columns from schema_data columns = {} column_tokens = [] # Handle both legacy columns list, dict-based columns mapping, and new column_types format incoming_columns = [] cols_obj = schema_data.get("columns") # Case A: legacy list of columns (strings or dicts) if isinstance(cols_obj, list): for col in cols_obj: if isinstance(col, str): incoming_columns.append({"name": col, "type": "unknown", "description": "", "tags": [], "sample_values": []}) elif isinstance(col, dict): col_name = col.get("name") or col.get("column") or "" if col_name: incoming_columns.append({ "name": col_name, "type": col.get("type") or col.get("datatype") or "unknown", "description": col.get("description") or col.get("desc") or "", "tags": col.get("tags") or [], "sample_values": col.get("sample_values") or col.get("examples") or [] }) # Case B: dict mapping of column_name -> metadata (common new shape) elif isinstance(cols_obj, dict): for col_name, info in cols_obj.items(): if isinstance(info, dict): incoming_columns.append({ "name": col_name, "type": info.get("data_type") or info.get("type") or info.get("ColumnType") or "unknown", "description": info.get("description") or info.get("desc") or "", "tags": info.get("tags") or info.get("column_tags") or [], "sample_values": list(info.get("sample_values") or info.get("examples") or []) }) else: # simple value mapping - treat as unknown type with provided value ignored incoming_columns.append({ "name": col_name, "type": "unknown", "description": "", "tags": [], "sample_values": [] }) # Case C: older 'column_types' mapping elif isinstance(schema_data.get("column_types"), dict): for col_name, info in schema_data.get("column_types", {}).items(): incoming_columns.append({ "name": col_name, "type": info.get("data_type") or info.get("type") or "unknown", "description": info.get("description", "") or "", "tags": info.get("tags") or [], "sample_values": list(info.get("sample_values") or []) }) # Process each column and create enhanced tokens for col_data in incoming_columns: col_name = col_data["name"] col_type = col_data["type"] col_description = col_data["description"] or self._generate_ai_description(col_name, col_type, table) col_tags = col_data["tags"] col_samples = col_data["sample_values"][:3] # Limit to 3 samples # Merge samples if provided if samples and isinstance(samples, dict) and col_name in samples: val = samples.get(col_name) if val is not None: sv = str(val) if not re.search(r"Kusto service error:|Semantic error:|Failed to execute KQL", sv, flags=re.IGNORECASE): if sv not in col_samples: col_samples.insert(0, sv) # Generate column token (ensure all samples are strings) sample_strs = [str(s) for s in col_samples[:2]] col_token = ( f"{SPECIAL_TOKENS['COLUMN_START']}{col_name}" f"{SPECIAL_TOKENS['TYPE_START']}{col_type}{SPECIAL_TOKENS['TYPE_END']}" f"{SPECIAL_TOKENS['DESCRIPTION_START']}{col_description}{SPECIAL_TOKENS['DESCRIPTION_END']}" f"{SPECIAL_TOKENS['TAGS_START']}{','.join(col_tags)}{SPECIAL_TOKENS['TAGS_END']}" f"{SPECIAL_TOKENS['SAMPLES_START']}{','.join(sample_strs)}{SPECIAL_TOKENS['SAMPLES_END']}" f"{SPECIAL_TOKENS['COLUMN_END']}" ) columns[col_name] = { "token": col_token, "data_type": col_type, "description": col_description, "tags": col_tags, "sample_values": col_samples } column_tokens.append(col_token) # Create table AI token table_token = ( f"{SPECIAL_TOKENS['CLUSTER_START']}{self._extract_cluster_name(normalized_cluster)}{SPECIAL_TOKENS['CLUSTER_END']}" f"{SPECIAL_TOKENS['DATABASE_START']}{database}{SPECIAL_TOKENS['DATABASE_END']}" f"{SPECIAL_TOKENS['TABLE_START']}{table}{SPECIAL_TOKENS['TABLE_END']}" f"{SPECIAL_TOKENS['SUMMARY_START']}{self._generate_table_summary(table, columns)}{SPECIAL_TOKENS['SUMMARY_END']}" f"{''.join(column_tokens[:10])}" # Limit to 10 columns ) # Check if table already exists to preserve successful_queries existing_table = db_data["tables"].get(table, {}) existing_queries = existing_table.get("successful_queries", []) # Overwrite schema to ensure it's always up-to-date, but preserve successful queries. db_data["tables"][table] = { "meta": { "token": f"{SPECIAL_TOKENS['TABLE_START']}{table}{SPECIAL_TOKENS['TABLE_END']}", "summary": f"{SPECIAL_TOKENS['SUMMARY_START']}{self._generate_table_summary(table, columns)}{SPECIAL_TOKENS['SUMMARY_END']}", "discovered_at": datetime.now().isoformat(), "last_updated": datetime.now().isoformat() }, "schema": { "columns": columns, "ai_token": table_token }, "successful_queries": existing_queries # Preserve existing queries } # Update table count and table list for database-level queries (merge with existing) try: existing_list = db_data["meta"].get("table_list", []) or [] except Exception: existing_list = [] merged_list = list(dict.fromkeys(existing_list + list(db_data["tables"].keys()))) db_data["meta"]["table_count"] = len(db_data["tables"]) try: db_data["meta"]["table_list"] = merged_list except Exception: # Fallback if tables structure is unexpected db_data["meta"].setdefault("table_list", merged_list) # Check memory limits and apply compression if needed if self._should_compress_cluster_data(normalized_cluster): self._compress_cluster_data(normalized_cluster) # Schedule save (background) and perform an immediate save to ensure persistence. # The background save batches multiple updates, but an immediate save here prevents # a race where the process exits or another instance overwrites the file before # the background thread runs. self._schedule_save() try: self.save_corpus() except Exception as e: logger.debug(f"Immediate save failed (will rely on scheduled save): {e}") # Clear cached get_schema results try: MemoryManager.get_schema.cache_clear() except Exception: pass logger.debug(f"Stored enhanced schema for {normalized_cluster}/{database}/{table}") except Exception as e: logger.error(f"Failed to store schema for {cluster_uri}/{database}/{table}: {e}") raise finally: self._lock.release() def get_database_schema(self, cluster: str, database: str) -> Dict[str, Any]: """Gets a database schema (list of tables) from the corpus using new structure.""" try: normalized_cluster = self._normalize_cluster_uri(cluster) db_data = self.corpus.get("clusters", {}).get(normalized_cluster, {}).get("databases", {}).get(database, {}) # Extract table list from meta section; fall back to actual stored tables if needed meta = db_data.get("meta", {}) tables = meta.get("table_list", []) # Fallback: if meta.table_list is empty, derive from stored table entries if not tables: try: tables = list(db_data.get("tables", {}).keys()) except Exception: tables = [] # Return in expected format return { "tables": tables, "table_count": meta.get("table_count", len(tables)), "last_discovered": meta.get("last_discovered"), "discovery_method": meta.get("discovery_method", "unknown"), "schema_version": meta.get("schema_version", "3.0") } except Exception as e: logger.warning(f"Failed to get database schema for {cluster}/{database}: {e}") return {} def store_database_schema(self, cluster: str, database: str, db_schema_data: Dict[str, Any]): """Stores a database schema object with enhanced metadata using new structure.""" try: # DEBUG: Log incoming database schema details for diagnostics try: table_count = len(db_schema_data.get("tables", [])) if isinstance(db_schema_data, dict) else None except Exception: table_count = None logger.debug( "store_database_schema called for %s/%s - tables_count=%s, keys=%s", cluster, database, table_count, list(db_schema_data.keys()) if isinstance(db_schema_data, dict) else None, ) normalized_cluster = self._normalize_cluster_uri(cluster) # Ensure cluster structure with meta section if normalized_cluster not in self.corpus["clusters"]: self.corpus["clusters"][normalized_cluster] = { "meta": { "token": f"{SPECIAL_TOKENS['CLUSTER_START']}{self._extract_cluster_name(normalized_cluster)}{SPECIAL_TOKENS['CLUSTER_END']}", "description": f"Cluster {normalized_cluster}", "last_accessed": datetime.now().isoformat() }, "databases": {} } cluster_data = self.corpus["clusters"][normalized_cluster] # Ensure database structure with meta section if database not in cluster_data["databases"]: cluster_data["databases"][database] = { "meta": { "token": f"{SPECIAL_TOKENS['DATABASE_START']}{database}{SPECIAL_TOKENS['DATABASE_END']}", "description": f"Database {database}", "table_count": 0 }, "tables": {} } # Update database meta with table count (merge with existing to avoid overwriting previously discovered tables) table_list = db_schema_data.get("tables", []) existing_meta = cluster_data["databases"][database].setdefault("meta", {}) existing_table_list = existing_meta.get("table_list", []) or [] # Merge table lists without removing previously discovered tables merged_tables = list(dict.fromkeys(existing_table_list + list(table_list))) cluster_data["databases"][database]["meta"]["table_count"] = len(merged_tables) # Store table list information (for database-level queries like ".show tables") cluster_data["databases"][database]["meta"]["table_list"] = merged_tables cluster_data["databases"][database]["meta"]["last_discovered"] = datetime.now().isoformat() cluster_data["databases"][database]["meta"]["discovery_method"] = "live_schema_discovery" cluster_data["databases"][database]["meta"]["schema_version"] = "3.0" # Schedule background save and perform immediate save to persist DB-level schema changes. self._schedule_save() try: self.save_corpus() except Exception as e: logger.debug(f"Immediate DB schema save failed (will rely on scheduled save): {e}") logger.info(f"Stored enhanced database schema for {cluster}/{database}: {len(table_list)} tables") except Exception as e: logger.error(f"Failed to store database schema: {e}") def add_successful_query(self, cluster_uri: str, database: str, table: str, kql: str, description: str): """Add a successful KQL query to the specific table in memory.""" try: normalized = self._normalize_cluster_uri(cluster_uri) # Ensure cluster exists if normalized not in self.corpus["clusters"]: self.corpus["clusters"][normalized] = { "meta": { "token": f"{SPECIAL_TOKENS['CLUSTER_START']}{self._extract_cluster_name(normalized)}{SPECIAL_TOKENS['CLUSTER_END']}", "description": f"Cluster {normalized}", "last_accessed": datetime.now().isoformat() }, "databases": {} } cluster_data = self.corpus["clusters"][normalized] # Ensure database exists if database not in cluster_data["databases"]: cluster_data["databases"][database] = { "meta": { "token": f"{SPECIAL_TOKENS['DATABASE_START']}{database}{SPECIAL_TOKENS['DATABASE_END']}", "description": f"Database {database}", "table_count": 0 }, "tables": {} } db_data = cluster_data["databases"][database] # Ensure table exists if table not in db_data["tables"]: db_data["tables"][table] = { "meta": { "token": f"{SPECIAL_TOKENS['TABLE_START']}{table}{SPECIAL_TOKENS['TABLE_END']}", "summary": f"{SPECIAL_TOKENS['SUMMARY_START']}{self._generate_table_summary(table, {})}{SPECIAL_TOKENS['SUMMARY_END']}", "discovered_at": datetime.now().isoformat(), "last_updated": datetime.now().isoformat() }, "schema": { "columns": {}, "ai_token": "" }, "successful_queries": [] } table_data = db_data["tables"][table] # Add the successful query query_entry = { "query": kql, "description": description, "timestamp": datetime.now().isoformat(), "token": f"{SPECIAL_TOKENS['QUERY_START']}{self._generate_query_token(kql)}{SPECIAL_TOKENS['QUERY_END']}" } # Add to successful_queries list table_data.setdefault("successful_queries", []).append(query_entry) # Limit to last 10 queries to prevent memory bloat if len(table_data["successful_queries"]) > 10: table_data["successful_queries"] = table_data["successful_queries"][-10:] # Update table's last_updated timestamp table_data["meta"]["last_updated"] = datetime.now().isoformat() # Schedule save self._schedule_save() except Exception as e: logger.warning(f"Failed to add successful query: {e}") def add_global_successful_query(self, cluster_uri: str, database: str, kql: str, description: str): """Add a successful KQL query to global storage when table association is not available.""" try: normalized = self._normalize_cluster_uri(cluster_uri) # Ensure cluster exists if normalized not in self.corpus["clusters"]: self.corpus["clusters"][normalized] = { "meta": { "token": f"{SPECIAL_TOKENS['CLUSTER_START']}{self._extract_cluster_name(normalized)}{SPECIAL_TOKENS['CLUSTER_END']}", "description": f"Cluster {normalized}", "last_accessed": datetime.now().isoformat() }, "databases": {} } cluster_data = self.corpus["clusters"][normalized] # Ensure database exists if database not in cluster_data["databases"]: cluster_data["databases"][database] = { "meta": { "token": f"{SPECIAL_TOKENS['DATABASE_START']}{database}{SPECIAL_TOKENS['DATABASE_END']}", "description": f"Database {database}", "table_count": 0 }, "tables": {} } # Add to cluster-level successful queries (global storage) if "successful_queries" not in cluster_data: cluster_data["successful_queries"] = [] query_entry = { "query": kql, "description": description, "database": database, "timestamp": datetime.now().isoformat(), "token": f"{SPECIAL_TOKENS['QUERY_START']}{self._generate_query_token(kql)}{SPECIAL_TOKENS['QUERY_END']}" } cluster_data["successful_queries"].append(query_entry) # Limit to last 20 global queries to prevent memory bloat if len(cluster_data["successful_queries"]) > 20: cluster_data["successful_queries"] = cluster_data["successful_queries"][-20:] # Schedule save self._schedule_save() logger.debug(f"Added global successful query for {database}: {description}") except Exception as e: logger.warning(f"Failed to add global successful query: {e}") async def validate_query( self, query: str, cluster_uri: str, database: str ) -> ValidationResult: """ Validate a KQL query against cached schema Args: query: KQL query to validate cluster_uri: Cluster URI database: Database name Returns: ValidationResult with validation details """ errors = [] warnings = [] suggestions = [] tables_used = set() columns_used = {} # Normalize query for analysis normalized_query = query.strip() # Extract table references table_refs = self._extract_table_references(normalized_query) # Get cached schema - FIX: Use proper memory structure access schema = await self._get_schema_for_validation(cluster_uri, database) if not schema: warnings.append(f"No cached schema found for {database} on {cluster_uri}. Validation limited.") return ValidationResult( is_valid=True, # Don't block if no schema errors=errors, warnings=warnings, suggestions=suggestions, validated_query=query, tables_used=tables_used, columns_used=columns_used ) # Validate table references for table_ref in table_refs: table_name = table_ref.get('table', '') if table_name: validation = self._validate_table_reference(table_name, schema, database) if validation['error']: errors.append(validation['error']) if validation['suggestion']: suggestions.append(validation['suggestion']) if validation['valid_table']: tables_used.add(validation['valid_table']) # Validate column references column_validations = self._validate_column_references( normalized_query, schema, tables_used ) errors.extend(column_validations['errors']) warnings.extend(column_validations['warnings']) suggestions.extend(column_validations['suggestions']) columns_used = column_validations['columns_used'] # Validate data types in operations type_validations = self._validate_data_types( normalized_query, schema, tables_used, columns_used ) errors.extend(type_validations['errors']) warnings.extend(type_validations['warnings']) suggestions.extend(type_validations['suggestions']) # Apply query corrections if possible corrected_query = self._apply_corrections( normalized_query, column_validations.get('corrections', {}), type_validations.get('corrections', {}) ) # Validate KQL syntax patterns syntax_validations = self._validate_syntax_patterns(corrected_query) errors.extend(syntax_validations['errors']) warnings.extend(syntax_validations['warnings']) return ValidationResult( is_valid=len(errors) == 0, errors=errors, warnings=warnings, suggestions=suggestions, validated_query=corrected_query, tables_used=tables_used, columns_used=columns_used ) def _extract_table_references(self, query: str) -> List[Dict[str, str]]: """ Extract table references from query using the existing parse_query_entities function Args: query: KQL query Returns: List of table reference dictionaries """ try: from .utils import parse_query_entities entities = parse_query_entities(query) tables = entities.get('tables', []) return [{'table': table} for table in tables] except Exception as e: logger.warning(f"Error extracting table references: {e}") return [] async def _get_schema_for_validation( self, cluster_uri: str, database: str ) -> Optional[Dict[str, Any]]: """ Get schema from memory for validation - FIXED to use new memory structure Args: cluster_uri: Cluster URI database: Database name Returns: Schema dictionary or None """ try: # Use the new memory structure to get schema normalized_cluster = self._normalize_cluster_uri(cluster_uri) cluster_data = self.corpus.get("clusters", {}).get(normalized_cluster, {}) db_data = cluster_data.get("databases", {}).get(database, {}) if not db_data: logger.debug(f"No database data found for {database} in {normalized_cluster}") return None # Build schema structure from tables tables_data = db_data.get("tables", {}) if not tables_data: logger.debug(f"No tables found in {database}") return None # Convert to validation-compatible format schema = { "tables": {} } for table_name, table_data in tables_data.items(): if isinstance(table_data, dict): # Check if table has schema data if "schema" in table_data: table_schema = table_data["schema"] columns = table_schema.get("columns", {}) else: # Fallback: check if columns are directly in table_data columns = table_data.get("columns", {}) if columns: # Convert columns to validation format schema["tables"][table_name] = { "columns": columns } logger.debug(f"Found {len(columns)} columns for table {table_name}") if schema["tables"]: logger.debug(f"Retrieved schema for validation: {len(schema['tables'])} tables") return schema else: logger.debug(f"No valid schemas found for validation in {database}") return None except Exception as e: logger.warning(f"Error accessing schema for validation: {e}") return None def _validate_table_reference( self, table_name: str, schema: Dict[str, Any], database: str ) -> Dict[str, Any]: """ Validate a table reference Args: table_name: Table name to validate schema: Database schema database: Database name Returns: Validation result dictionary """ result = { 'error': None, 'suggestion': None, 'valid_table': None } tables = schema.get('tables', {}) # Exact match if table_name in tables: result['valid_table'] = table_name return result # Case-insensitive match from .utils import normalize_name normalized_table = normalize_name(table_name) for actual_table in tables: if normalize_name(actual_table) == normalized_table: result['valid_table'] = actual_table result['suggestion'] = f"Table '{table_name}' should be '{actual_table}' (case-sensitive)" return result # Find similar tables similar = self._find_similar_names(table_name, list(tables.keys())) if similar: result['error'] = f"Table '{table_name}' not found in {database}" result['suggestion'] = f"Did you mean: {', '.join(similar[:3])}?" else: result['error'] = f"Table '{table_name}' not found in {database}" return result def _validate_column_references( self, query: str, schema: Dict[str, Any], tables_used: Set[str] ) -> Dict[str, Any]: """ Validate column references in the query Args: query: KQL query schema: Database schema tables_used: Set of tables used in query Returns: Validation results with errors, warnings, suggestions """ errors = [] warnings = [] suggestions = [] columns_used = {} corrections = {} # Extract column references using enhanced logic found_columns = self._extract_columns_from_query(query, list(tables_used)) # Validate each column tables = schema.get('tables', {}) for col in found_columns: validated = False # Check in each used table for table in tables_used: if table in tables: table_schema = tables[table] columns = table_schema.get('columns', {}) # Exact match if col in columns: if table not in columns_used: columns_used[table] = set() columns_used[table].add(col) validated = True break # Case-insensitive match from .utils import normalize_name for actual_col in columns: if normalize_name(actual_col) == normalize_name(col): if table not in columns_used: columns_used[table] = set() columns_used[table].add(actual_col) corrections[col] = actual_col suggestions.append(f"Column '{col}' should be '{actual_col}' (case-sensitive)") validated = True break if not validated and tables_used: # Find similar columns all_columns = [] for table in tables_used: if table in tables: all_columns.extend(tables[table].get('columns', {}).keys()) similar = self._find_similar_names(col, all_columns) if similar: errors.append(f"Column '{col}' not found") suggestions.append(f"Did you mean: {', '.join(similar[:3])}?") else: errors.append(f"Column '{col}' not found in tables: {', '.join(tables_used)}") return { 'errors': errors, 'warnings': warnings, 'suggestions': suggestions, 'columns_used': columns_used, 'corrections': corrections } def _validate_data_types( self, query: str, schema: Dict[str, Any], tables_used: Set[str], columns_used: Dict[str, Set[str]] ) -> Dict[str, Any]: """ Validate data type compatibility in operations Args: query: KQL query schema: Database schema tables_used: Tables used in query columns_used: Columns used per table Returns: Validation results """ errors = [] warnings = [] suggestions = [] corrections = {} # Check type compatibility in common operations type_patterns = [ (r'(\w+)\s*==\s*"([^"]+)"', 'string_comparison'), (r'(\w+)\s*==\s*(\d+)', 'numeric_comparison'), (r'(\w+)\s*>\s*(\d+)', 'numeric_operation'), (r'(\w+)\s*<\s*(\d+)', 'numeric_operation'), (r'sum\s*\(\s*(\w+)\s*\)', 'aggregation'), (r'avg\s*\(\s*(\w+)\s*\)', 'aggregation'), ] tables = schema.get('tables', {}) for pattern, op_type in type_patterns: matches = re.finditer(pattern, query, re.IGNORECASE) for match in matches: column = match.group(1) # Find column type column_type = None from .utils import normalize_name for table, cols in columns_used.items(): if column in cols or normalize_name(column) in [normalize_name(c) for c in cols]: if table in tables: table_cols = tables[table].get('columns', {}) for col_name, col_info in table_cols.items(): if normalize_name(col_name) == normalize_name(column): column_type = col_info.get('data_type', col_info.get('type', 'unknown')) break if column_type: # Validate based on operation type if op_type == 'string_comparison' and column_type not in ['string', 'dynamic']: warnings.append( f"Column '{column}' is type '{column_type}' but used in string comparison" ) suggestions.append(f"Consider using tostring({column}) for explicit conversion") elif op_type == 'numeric_operation' and column_type not in ['int', 'long', 'real', 'decimal']: errors.append( f"Column '{column}' is type '{column_type}' but used in numeric operation" ) if column_type == 'string': suggestions.append(f"Consider using toint({column}) or toreal({column})") elif op_type == 'aggregation' and column_type not in ['int', 'long', 'real', 'decimal']: errors.append( f"Cannot aggregate column '{column}' of type '{column_type}'" ) return { 'errors': errors, 'warnings': warnings, 'suggestions': suggestions, 'corrections': corrections } def _validate_syntax_patterns(self, query: str) -> Dict[str, List[str]]: """ Validate common KQL syntax patterns Args: query: KQL query Returns: Dictionary with errors and warnings """ errors = [] warnings = [] # Check for common syntax issues syntax_checks = [ (r'\|\s*\|', "Double pipe operator detected"), (r'where\s+and\s+', "Invalid WHERE AND syntax"), (r'summarize\s+by\s*$', "Summarize by clause is empty"), (r'project\s*$', "Project clause is empty"), (r'join\s+kind\s*=', "Join kind syntax should be 'join kind=inner' not 'join kind ='"), ] for pattern, message in syntax_checks: if re.search(pattern, query, re.IGNORECASE): errors.append(message) # Check for unbalanced parentheses open_parens = query.count('(') close_parens = query.count(')') if open_parens != close_parens: errors.append(f"Unbalanced parentheses: {open_parens} opening, {close_parens} closing") # Check for unbalanced quotes single_quotes = query.count("'") double_quotes = query.count('"') if single_quotes % 2 != 0: errors.append("Unbalanced single quotes") if double_quotes % 2 != 0: errors.append("Unbalanced double quotes") # Warn about deprecated syntax deprecated_patterns = [ (r'sort\s+by', "Use 'order by' instead of 'sort by' (deprecated)"), (r'limit\s+\d+', "Use 'take' instead of 'limit' (deprecated)"), ] for pattern, message in deprecated_patterns: if re.search(pattern, query, re.IGNORECASE): warnings.append(message) return { 'errors': errors, 'warnings': warnings } def _apply_corrections( self, query: str, column_corrections: Dict[str, str], type_corrections: Dict[str, str] ) -> str: """ Apply automatic corrections to the query Args: query: Original query column_corrections: Column name corrections type_corrections: Type conversion corrections Returns: Corrected query """ corrected = query # Apply column name corrections for wrong_name, correct_name in column_corrections.items(): # Use word boundary to avoid partial replacements pattern = rf'\b{re.escape(wrong_name)}\b' corrected = re.sub(pattern, correct_name, corrected, flags=re.IGNORECASE) # Apply type corrections for col, conversion in type_corrections.items(): corrected = corrected.replace(col, conversion) return corrected def _find_similar_names(self, target: str, candidates: List[str], max_results: int = 3) -> List[str]: """ Find similar names using simple string similarity. Args: target: Target name to find similarities for candidates: List of candidate names max_results: Maximum number of results to return Returns: List of similar names sorted by similarity """ if not target or not candidates: return [] target_lower = target.lower() similarities = [] for candidate in candidates: if not candidate: continue candidate_lower = candidate.lower() # Skip exact matches if candidate_lower == target_lower: continue # Calculate similarity score score = 0.0 # Check if target is substring of candidate or vice versa if target_lower in candidate_lower or candidate_lower in target_lower: score += 0.7 # Check for common prefix prefix_len = 0 for i in range(min(len(target_lower), len(candidate_lower))): if target_lower[i] == candidate_lower[i]: prefix_len += 1 else: break if prefix_len > 0: score += (prefix_len / max(len(target_lower), len(candidate_lower))) * 0.5 # Check for common suffix suffix_len = 0 target_rev = target_lower[::-1] candidate_rev = candidate_lower[::-1] for i in range(min(len(target_rev), len(candidate_rev))): if target_rev[i] == candidate_rev[i]: suffix_len += 1 else: break if suffix_len > 0: score += (suffix_len / max(len(target_lower), len(candidate_lower))) * 0.3 # Simple Levenshtein-like score if len(target_lower) > 2 and len(candidate_lower) > 2: common_chars = set(target_lower) & set(candidate_lower) if common_chars: score += len(common_chars) / max(len(set(target_lower)), len(set(candidate_lower))) * 0.2 if score > 0.1: # Only include if reasonably similar similarities.append((score, candidate)) # Sort by similarity score (descending) and return top results similarities.sort(key=lambda x: x[0], reverse=True) return [name for _, name in similarities[:max_results]] def _extract_columns_from_query(self, query: str, tables: List[str]) -> Set[str]: """ Extract column names referenced in the query with enhanced logic. This includes columns in project, where, extend, summarize clauses and more. Args: query: KQL query string tables: List of table names used in query Returns: Set of column names found in the query """ columns = set() # Import KQL constants try: from .constants import KQL_RESERVED_WORDS, KQL_FUNCTIONS except ImportError: KQL_RESERVED_WORDS = ['where', 'project', 'summarize', 'extend', 'order', 'by', 'take', 'limit'] KQL_FUNCTIONS = ['count', 'sum', 'avg', 'min', 'max'] # Extract bracketed columns [ColumnName] or ['ColumnName'] bracketed_columns = re.findall(r'\[\'?([a-zA-Z0-9_]+)\'?\]', query) columns.update(bracketed_columns) # Extract columns from project clauses - enhanced pattern project_matches = re.finditer(r'\|\s*project\s+([^|]+)', query, re.IGNORECASE) for match in project_matches: project_content = match.group(1).strip() # Split by comma and clean each column for col in project_content.split(','): col = col.strip() # Remove any alias (column = alias or alias = expression) if '=' in col: # Check if it's column = alias or alias = column expression parts = col.split('=') left_part = parts[0].strip() # Take the left part as it's usually the column name col = left_part # Extract clean column name (handle functions and brackets) col = re.sub(r'\(.*?\)', '', col) # Remove function calls col = re.sub(r'\[|\]', '', col) # Remove brackets clean_col = re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*', col.strip()) if clean_col and clean_col.group(0).lower() not in [w.lower() for w in KQL_RESERVED_WORDS]: columns.add(clean_col.group(0)) # Extract columns from where clauses - enhanced pattern where_matches = re.finditer(r'\|\s*where\s+([^|]+)', query, re.IGNORECASE) for match in where_matches: where_content = match.group(1).strip() # Find column names in various conditions col_patterns = [ r'\b([a-zA-Z_][a-zA-Z0-9_]*)\s*(?:==|!=|<=|>=|<|>|contains|startswith|endswith|has|!has)', r'\b([a-zA-Z_][a-zA-Z0-9_]*)\s*(?:in|!in)\s*\(', r'isnotnull\s*\(\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\)', r'isnull\s*\(\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\)', r'isempty\s*\(\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\)', r'isnotempty\s*\(\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\)' ] for pattern in col_patterns: col_matches = re.findall(pattern, where_content, re.IGNORECASE) for col in col_matches: if col.lower() not in [w.lower() for w in KQL_RESERVED_WORDS]: columns.add(col) # Extract columns from summarize clauses summarize_matches = re.finditer(r'\|\s*summarize\s+([^|]+)', query, re.IGNORECASE) for match in summarize_matches: summarize_content = match.group(1).strip() # Extract from aggregation functions agg_patterns = [ r'(?:count|sum|avg|min|max|stdev|variance)\s*\(\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\)', r'dcount\s*\(\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\)', r'countif\s*\(\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*[^)]*\)' ] for pattern in agg_patterns: agg_matches = re.findall(pattern, summarize_content, re.IGNORECASE) for col in agg_matches: if col.lower() not in [w.lower() for w in KQL_RESERVED_WORDS]: columns.add(col) # Extract from 'by' clause by_match = re.search(r'\bby\s+([^,\|]+)', summarize_content, re.IGNORECASE) if by_match: by_content = by_match.group(1).strip() for col in by_content.split(','): col = col.strip() # Handle functions in by clause col = re.sub(r'\(.*?\)', '', col) clean_col = re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*', col) if clean_col and clean_col.group(0).lower() not in [w.lower() for w in KQL_RESERVED_WORDS]: columns.add(clean_col.group(0)) # Extract columns from extend clauses extend_matches = re.finditer(r'\|\s*extend\s+([^|]+)', query, re.IGNORECASE) for match in extend_matches: extend_content = match.group(1).strip() # Look for column references in expressions col_matches = re.findall(r'\b([a-zA-Z_][a-zA-Z0-9_]*)\b', extend_content) for col in col_matches: if (col.lower() not in [w.lower() for w in KQL_RESERVED_WORDS] and col.lower() not in [w.lower() for w in KQL_FUNCTIONS]): columns.add(col) # Extract columns from order by clauses order_matches = re.finditer(r'\|\s*order\s+by\s+([^|]+)', query, re.IGNORECASE) for match in order_matches: order_content = match.group(1).strip() for col in order_content.split(','): col = col.strip() # Remove asc/desc col = re.sub(r'\s+(asc|desc)$', '', col, flags=re.IGNORECASE) clean_col = re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*', col) if clean_col and clean_col.group(0).lower() not in [w.lower() for w in KQL_RESERVED_WORDS]: columns.add(clean_col.group(0)) # Extract columns from join clauses join_matches = re.finditer(r'\|\s*join\s+(?:kind\s*=\s*\w+\s+)?([^|]+)', query, re.IGNORECASE) for match in join_matches: join_content = match.group(1).strip() # Look for $left.column and $right.column patterns join_col_matches = re.findall(r'\$(?:left|right)\.([a-zA-Z_][a-zA-Z0-9_]*)', join_content) for col in join_col_matches: if col.lower() not in [w.lower() for w in KQL_RESERVED_WORDS]: columns.add(col) # Filter out KQL reserved words and operators reserved_words_lower = {word.lower() for word in KQL_RESERVED_WORDS} columns = {col for col in columns if col.lower() not in reserved_words_lower} return columns def _apply_schema_fallback_strategies(self, cluster_uri: str, database: str, table: str) -> Dict[str, Any]: """ Apply comprehensive schema discovery fallback strategies when live discovery fails. Fallback Strategy Order: 1. cached_schema: Use any available cached schema data 2. query_derived_schema: Derive schema from successful query history 3. minimal_schema: Generate minimal schema with common patterns Args: cluster_uri: Cluster URI database: Database name table: Table name Returns: Schema dictionary from the first successful fallback strategy """ from .constants import ERROR_HANDLING_CONFIG fallback_strategies = ERROR_HANDLING_CONFIG.get("fallback_strategies", [ "cached_schema", "query_derived_schema", "minimal_schema" ]) logger.info(f"Applying schema fallback strategies for {cluster_uri}/{database}/{table}") for strategy in fallback_strategies: try: if strategy == "cached_schema": schema = self._fallback_cached_schema(cluster_uri, database, table) elif strategy == "query_derived_schema": schema = self._fallback_query_derived_schema(cluster_uri, database, table) elif strategy == "minimal_schema": schema = self._fallback_minimal_schema(cluster_uri, database, table) else: logger.warning(f"Unknown fallback strategy: {strategy}") continue if schema and isinstance(schema, dict) and schema.get("columns"): logger.info(f"Fallback strategy '{strategy}' succeeded for {table}") schema["fallback_strategy"] = strategy schema["fallback_applied_at"] = datetime.now().isoformat() return schema except Exception as e: logger.warning(f"Fallback strategy '{strategy}' failed for {table}: {e}") continue # If all fallback strategies fail, return minimal emergency schema logger.error(f"All fallback strategies failed for {table}, using emergency minimal schema") return self._emergency_minimal_schema(table) def _fallback_cached_schema(self, cluster_uri: str, database: str, table: str) -> Optional[Dict[str, Any]]: """ Fallback Strategy 1: Attempt to use any available cached schema data. Searches for: - Partial schema data in the corpus - Similar table schemas in the same database - Historical schema data that might still be valid """ try: normalized_cluster = self._normalize_cluster_uri(cluster_uri) # Check for any partial schema data in the corpus cluster_data = self.corpus.get("clusters", {}).get(normalized_cluster, {}) db_data = cluster_data.get("databases", {}).get(database, {}) table_data = db_data.get("tables", {}).get(table, {}) # Look for any existing schema fragments if table_data and isinstance(table_data, dict): schema_data = table_data.get("schema", {}) if schema_data and "columns" in schema_data: logger.info(f"Found partial cached schema for {table}") return schema_data # Check if we have successful queries that might indicate column structure successful_queries = table_data.get("successful_queries", []) if successful_queries: derived_schema = self._derive_schema_from_queries(successful_queries, table) if derived_schema: logger.info(f"Derived schema from successful queries for {table}") return derived_schema # Look for similar tables in the same database (pattern-based matching) similar_schema = self._find_similar_table_schema(db_data, table) if similar_schema: logger.info(f"Found similar table schema for {table}") return similar_schema return None except Exception as e: logger.warning(f"Cached schema fallback failed: {e}") return None def _fallback_query_derived_schema(self, cluster_uri: str, database: str, table: str) -> Optional[Dict[str, Any]]: """ Fallback Strategy 2: Derive schema from historical successful query patterns. Analyzes successful queries to infer: - Column names from project clauses - Data types from filter operations - Common column patterns from aggregations """ try: normalized_cluster = self._normalize_cluster_uri(cluster_uri) # Collect all successful queries for this table from the corpus cluster_data = self.corpus.get("clusters", {}).get(normalized_cluster, {}) all_queries = [] # Get queries from table-specific successful_queries db_data = cluster_data.get("databases", {}).get(database, {}) table_data = db_data.get("tables", {}).get(table, {}) if table_data: all_queries.extend(table_data.get("successful_queries", [])) # Get queries from cluster-level successful_queries (legacy) cluster_queries = cluster_data.get("successful_queries", []) table_specific_queries = [ q for q in cluster_queries if isinstance(q, dict) and table.lower() in str(q.get("query", "")).lower() ] all_queries.extend(table_specific_queries) if not all_queries: logger.info(f"No query history found for schema derivation of {table}") return None # Analyze queries to derive schema derived_columns = self._analyze_queries_for_schema(all_queries, table) if not derived_columns: return None # Create schema object schema = { "table_name": table, "columns": derived_columns, "discovered_at": datetime.now().isoformat(), "cluster": cluster_uri, "database": database, "derived_from_queries": len(all_queries), "derivation_method": "query_analysis" } logger.info(f"Derived schema for {table} from {len(all_queries)} queries") return schema except Exception as e: logger.warning(f"Query-derived schema fallback failed: {e}") return None def _fallback_minimal_schema(self, cluster_uri: str, database: str, table: str) -> Dict[str, Any]: """Direct KQL-based schema discovery using getschema and take commands.""" try: # Use direct KQL execution to get real schema - no static keywords from .execute_kql import kql_execute_tool # Get schema using | getschema schema_query = f"{table} | getschema" schema_df = kql_execute_tool(schema_query, cluster_uri, database) if schema_df is not None and not schema_df.empty: columns = {} for _, row in schema_df.iterrows(): col_name = row.get("ColumnName", "") col_type = row.get("DataType", row.get("ColumnType", "string")) columns[col_name] = { 'data_type': col_type, 'description': self._generate_ai_description(col_name, col_type, table), 'tags': self._generate_column_tags(col_name, col_type), 'sample_values': [] } # Get sample data using | take 2 sample_query = f"{table} | take 2" sample_df = kql_execute_tool(sample_query, cluster_uri, database) if sample_df is not None and not sample_df.empty: for col_name in columns.keys(): if col_name in sample_df.columns: sample_values = sample_df[col_name].dropna().astype(str).tolist()[:2] columns[col_name]['sample_values'] = sample_values return { "table_name": table, "columns": columns, "discovered_at": datetime.now().isoformat(), "cluster": cluster_uri, "database": database, "column_count": len(columns), "schema_type": "direct_kql_discovery" } return self._emergency_minimal_schema(table) except Exception as e: logger.warning(f"Direct KQL schema discovery failed: {e}") return self._emergency_minimal_schema(table) def _emergency_minimal_schema(self, table: str) -> Dict[str, Any]: """ Emergency fallback: Generate absolute minimal schema to prevent total failure. Creates the most basic schema possible to allow query execution. """ emergency_columns = { 'TimeGenerated': { 'data_type': 'datetime', 'description': 'Timestamp field', 'tags': ['TIME_COLUMN'], 'sample_values': [] }, 'Data': { 'data_type': 'string', 'description': 'Data field', 'tags': ['TEXT'], 'sample_values': [] } } return { "table_name": table, "columns": emergency_columns, "discovered_at": datetime.now().isoformat(), "column_count": len(emergency_columns), "schema_type": "emergency_fallback" } def _derive_schema_from_queries(self, queries: List[Dict[str, Any]], table: str) -> Optional[Dict[str, Any]]: """Derive schema information from successful query patterns.""" column_info = {} for query_entry in queries: query = query_entry.get("query", "") if isinstance(query_entry, dict) else str(query_entry) # Extract columns from project clauses project_matches = re.findall(r'\|\s*project\s+([^|]+)', query, re.IGNORECASE) for match in project_matches: columns = [col.strip() for col in match.split(',')] for col in columns: # Clean column name (remove brackets, aliases, etc.) clean_col = re.sub(r'[\[\]\'"`]', '', col.split(' as ')[0].strip()) if clean_col and not any(op in clean_col.lower() for op in ['(', ')', '+', '-', '*', '/']): column_info[clean_col] = { 'data_type': 'string', # Default type 'description': 'Column derived from query analysis', 'tags': ['DERIVED'], 'sample_values': [] } if column_info: return { "table_name": table, "columns": column_info, "discovered_at": datetime.now().isoformat(), "derivation_method": "query_project_analysis" } return None def _find_similar_table_schema(self, db_data: Dict[str, Any], target_table: str) -> Optional[Dict[str, Any]]: """Find schema from similar tables in the same database.""" try: target_lower = target_table.lower() tables = db_data.get("tables", {}) # Look for tables with similar naming patterns for table_name, table_data in tables.items(): if table_name == target_table: continue table_lower = table_name.lower() # Check for similar prefixes, suffixes, or contained patterns similarity_score = 0 # Same prefix (first 3+ characters) if len(table_lower) > 3 and len(target_lower) > 3: if table_lower[:3] == target_lower[:3]: similarity_score += 1 # Same suffix (last 3+ characters) if len(table_lower) > 3 and len(target_lower) > 3: if table_lower[-3:] == target_lower[-3:]: similarity_score += 1 # Contains common substrings common_patterns = ['event', 'log', 'security', 'audit', 'network', 'process'] for pattern in common_patterns: if pattern in table_lower and pattern in target_lower: similarity_score += 2 # If similarity found, use this schema as a template if similarity_score >= 1: schema_data = table_data.get("schema", {}) if schema_data and "columns" in schema_data: # Copy schema but update metadata similar_schema = schema_data.copy() similar_schema["table_name"] = target_table similar_schema["similarity_source"] = table_name similar_schema["similarity_score"] = similarity_score similar_schema["discovered_at"] = datetime.now().isoformat() logger.info(f"Using schema from similar table {table_name} for {target_table}") return similar_schema return None except Exception as e: logger.warning(f"Similar table schema lookup failed: {e}") return None def _analyze_queries_for_schema(self, queries: List[Dict[str, Any]], table: str) -> Dict[str, Any]: """Analyze query patterns to derive comprehensive schema information.""" columns = {} for query_entry in queries: query = query_entry.get("query", "") if isinstance(query_entry, dict) else str(query_entry) # Extract columns from various KQL operations self._extract_columns_from_project(query, columns) self._extract_columns_from_where(query, columns) self._extract_columns_from_summarize(query, columns) self._extract_columns_from_extend(query, columns) return columns def _extract_columns_from_project(self, query: str, columns: Dict[str, Any]): """Extract column information from project clauses.""" project_patterns = re.findall(r'\|\s*project\s+([^|]+)', query, re.IGNORECASE) for match in project_patterns: column_list = [col.strip() for col in match.split(',')] for col in column_list: # Handle aliases and clean column names if ' as ' in col.lower(): original_col = col.split(' as ')[0].strip() else: original_col = col clean_col = re.sub(r'[\[\]\'"`]', '', original_col) if clean_col and clean_col not in columns: columns[clean_col] = { 'data_type': 'string', 'description': 'Column from project analysis', 'tags': ['PROJECTED'], 'sample_values': [] } def _extract_columns_from_where(self, query: str, columns: Dict[str, Any]): """Extract column information from where clauses with data type hints.""" where_patterns = re.findall(r'\|\s*where\s+([^|]+)', query, re.IGNORECASE) for match in where_patterns: # Look for column comparisons comparisons = re.findall(r'(\w+)\s*(?:==|!=|>=|<=|>|<|contains|has)\s*([^\\s]+)', match) for col_name, value in comparisons: if col_name not in columns: # Infer data type from comparison value data_type = 'string' if value.strip().startswith('datetime('): data_type = 'datetime' elif value.isdigit(): data_type = 'int' elif re.match(r'^\d+\.\d+$', value): data_type = 'real' columns[col_name] = { 'data_type': data_type, 'description': 'Column from where clause analysis', 'tags': ['FILTERED'], 'sample_values': [value] if value not in ['true', 'false', 'null'] else [] } def _extract_columns_from_summarize(self, query: str, columns: Dict[str, Any]): """Extract column information from summarize clauses.""" summarize_patterns = re.findall(r'\|\s*summarize\s+([^|]+)', query, re.IGNORECASE) for match in summarize_patterns: # Extract aggregation columns and group by columns if ' by ' in match.lower(): agg_part, by_part = match.lower().split(' by ', 1) # Process group by columns by_columns = [col.strip() for col in by_part.split(',')] for col in by_columns: clean_col = re.sub(r'[\[\]\'"`]', '', col) if clean_col and clean_col not in columns: columns[clean_col] = { 'data_type': 'string', 'description': 'Group by column', 'tags': ['GROUPING'], 'sample_values': [] } def _extract_columns_from_extend(self, query: str, columns: Dict[str, Any]): """Extract column information from extend clauses.""" extend_patterns = re.findall(r'\|\s*extend\s+([^|]+)', query, re.IGNORECASE) for match in extend_patterns: # Extract new column definitions definitions = [defn.strip() for defn in match.split(',')] for defn in definitions: if '=' in defn: new_col = defn.split('=')[0].strip() clean_col = re.sub(r'[\[\]\'"`]', '', new_col) if clean_col and clean_col not in columns: columns[clean_col] = { 'data_type': 'string', # Extended columns often calculated 'description': 'Extended/calculated column', 'tags': ['CALCULATED'], 'sample_values': [] } def _generate_query_token(self, query: str) -> str: """Generate a simplified token for a query.""" # Remove special characters and normalize normalized = re.sub(r'[^\w\s]', '', query.lower()) # Replace spaces with underscores and limit length token = re.sub(r'\s+', '_', normalized)[:30] return token def get_successful_queries(self, cluster_uri: str, database: str, table: str) -> List[Dict[str, Any]]: """Get successful queries for a specific table.""" try: normalized = self._normalize_cluster_uri(cluster_uri) return (self.corpus .get("clusters", {}) .get(normalized, {}) .get("databases", {}) .get(database, {}) .get("tables", {}) .get(table, {}) .get("successful_queries", [])) except Exception as e: logger.warning(f"Failed to get successful queries: {e}") return [] def store_learning_result(self, query: str, result_data: Dict[str, Any], execution_type: str): """ Store query execution results for learning and future context building. Enhanced with session-based tracking. """ try: # Extract cluster, database, and table information from the query from .execute_kql import extract_cluster_and_database_from_query, extract_tables_from_query cluster_uri, database = extract_cluster_and_database_from_query(query) tables = extract_tables_from_query(query) # No hardcoded defaults - require explicit cluster and database if not cluster_uri: logger.warning("No cluster_uri found in query - cannot store learning result without cluster information") return if not database: logger.warning("No database found in query - cannot store learning result without database information") return # Get session ID from result data or generate one session_id = result_data.get("session_id") or self._generate_session_id() # Use first table or create a generic entry primary_table = tables[0] if tables else "UnknownTable" # Create learning entry with enhanced session tracking learning_entry = { "query": query, "execution_type": execution_type, "timestamp": datetime.now().isoformat(), "session_id": session_id, "result_metadata": { "row_count": result_data.get("row_count", 0), "column_count": len(result_data.get("columns", [])), "columns": result_data.get("columns", []), "success": result_data.get("success", True), "execution_time_ms": result_data.get("execution_time_ms", 0) }, "learning_insights": { "query_complexity": len(query.split("|")), "has_filters": "where" in query.lower(), "has_aggregation": any(op in query.lower() for op in ["summarize", "count", "sum", "avg"]), "has_time_reference": "ago(" in query.lower(), "data_found": result_data.get("row_count", 0) > 0, "tables_involved": tables, "cluster": cluster_uri, "database": database } } # Store the learning result as a successful query for the primary table description = f"Learning result from {execution_type} - {result_data.get('row_count', 0)} rows - {execution_type}" self.add_successful_query(cluster_uri, database, primary_table, query, description) # Store in session-based learning section self._store_session_learning(session_id, learning_entry) # Also store in cluster-level learning results for backward compatibility self._store_cluster_learning(cluster_uri, learning_entry) logger.info(f"Stored learning result for session {session_id}: {len(query)} chars, {result_data.get('row_count', 0)} rows") except Exception as e: logger.error(f"Failed to store learning result: {e}") def _generate_session_id(self) -> str: """Generate a unique session ID.""" import uuid return f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{str(uuid.uuid4())[:8]}" def _store_session_learning(self, session_id: str, learning_entry: Dict[str, Any]): """Store learning entry in session-based structure.""" try: # Ensure sessions section exists if "sessions" not in self.corpus: self.corpus["sessions"] = {} # Ensure session exists if session_id not in self.corpus["sessions"]: self.corpus["sessions"][session_id] = { "created_at": datetime.now().isoformat(), "last_updated": datetime.now().isoformat(), "query_count": 0, "learning_entries": [], "session_insights": { "total_rows_processed": 0, "unique_tables": set(), "unique_clusters": set(), "query_types": set() } } session_data = self.corpus["sessions"][session_id] # Add learning entry session_data["learning_entries"].append(learning_entry) session_data["query_count"] += 1 session_data["last_updated"] = datetime.now().isoformat() # Update session insights insights = session_data["session_insights"] insights["total_rows_processed"] += learning_entry.get("result_metadata", {}).get("row_count", 0) insights["unique_tables"].update(learning_entry.get("learning_insights", {}).get("tables_involved", [])) insights["unique_clusters"].add(learning_entry.get("learning_insights", {}).get("cluster", "")) insights["query_types"].add(learning_entry.get("execution_type", "")) # Convert sets to lists for JSON serialization insights["unique_tables"] = list(insights["unique_tables"]) insights["unique_clusters"] = list(insights["unique_clusters"]) insights["query_types"] = list(insights["query_types"]) # Limit session entries to prevent memory bloat if len(session_data["learning_entries"]) > 100: session_data["learning_entries"] = session_data["learning_entries"][-100:] # Schedule save self._schedule_save() except Exception as e: logger.error(f"Failed to store session learning: {e}") def _store_cluster_learning(self, cluster_uri: str, learning_entry: Dict[str, Any]): """Store learning entry in cluster-level structure for backward compatibility.""" try: normalized_cluster = self._normalize_cluster_uri(cluster_uri) # Ensure cluster structure exists if normalized_cluster not in self.corpus["clusters"]: self.corpus["clusters"][normalized_cluster] = { "meta": { "token": f"{SPECIAL_TOKENS['CLUSTER_START']}{self._extract_cluster_name(normalized_cluster)}{SPECIAL_TOKENS['CLUSTER_END']}", "description": f"Cluster {normalized_cluster}", "last_accessed": datetime.now().isoformat() }, "databases": {} } cluster_data = self.corpus["clusters"][normalized_cluster] # Ensure learning_results section exists if "learning_results" not in cluster_data: cluster_data["learning_results"] = [] # Add to learning results only if the query returned rows row_count = learning_entry.get("result_metadata", {}).get("row_count", 0) if row_count and row_count > 0: cluster_data["learning_results"].append(learning_entry) # Keep only the most recent 50 learning results to prevent memory bloat if len(cluster_data["learning_results"]) > 50: cluster_data["learning_results"] = cluster_data["learning_results"][-50:] except Exception as e: logger.error(f"Failed to store cluster learning: {e}") def get_session_data(self, session_id: str) -> Dict[str, Any]: """Get session data for analysis and reporting.""" try: return self.corpus.get("sessions", {}).get(session_id, {}) except Exception as e: logger.error(f"Failed to get session data: {e}") return {} def get_session_queries(self, session_id: str) -> List[Dict[str, Any]]: """Get queries for a specific session.""" try: session_data = self.get_session_data(session_id) return session_data.get("learning_entries", []) except Exception as e: logger.error(f"Failed to get session queries: {e}") return [] def list_active_sessions(self, limit: int = 10) -> List[Dict[str, Any]]: """List active sessions with basic metadata.""" try: sessions = self.corpus.get("sessions", {}) session_list = [] for session_id, session_data in list(sessions.items())[-limit:]: session_summary = { "session_id": session_id, "created_at": session_data.get("created_at"), "last_updated": session_data.get("last_updated"), "query_count": session_data.get("query_count", 0), "total_rows": session_data.get("session_insights", {}).get("total_rows_processed", 0) } session_list.append(session_summary) return sorted(session_list, key=lambda x: x["last_updated"], reverse=True) except Exception as e: logger.error(f"Failed to list active sessions: {e}") return [] def _create_ai_friendly_token( self, table: str, cluster_uri: str, database: str, columns: Union[List[Dict[str, Any]], Dict[str, Dict[str, Any]]], ) -> str: """Create AI-friendly token with XML-style markers for efficient parsing.""" # Start with cluster, database, and table tokens token_parts = [ f"{SPECIAL_TOKENS['CLUSTER_START']}{self._extract_cluster_name(cluster_uri)}{SPECIAL_TOKENS['CLUSTER_END']}", f"{SPECIAL_TOKENS['DATABASE_START']}{database}{SPECIAL_TOKENS['DATABASE_END']}", f"{SPECIAL_TOKENS['TABLE_START']}{table}{SPECIAL_TOKENS['TABLE_END']}", ] # Derive column items from input column_items: List[Tuple[str, Dict[str, Any]]] = [] if isinstance(columns, dict): # columns is a mapping: name -> metadata column_items = list(columns.items()) elif isinstance(columns, list): # columns is a list of dicts for col in columns: if isinstance(col, dict): name = col.get("name", "unknown") meta = { "data_type": col.get("type") or col.get("data_type", "unknown"), "description": col.get("description", ""), "tags": col.get("tags", []), "sample_values": col.get("sample_values", []), } column_items.append((name, meta)) elif isinstance(col, str): column_items.append((col, {"data_type": "unknown", "description": "", "tags": [], "sample_values": []})) # Add table summary column_names = [c[0] for c in column_items] if column_items else [] table_summary = self._generate_table_summary(table, column_names) token_parts.append(f"{SPECIAL_TOKENS['SUMMARY_START']}{table_summary}{SPECIAL_TOKENS['SUMMARY_END']}") # Add columns with enhanced XML-style tokens (limit to prevent bloat) for name, meta in column_items[:10]: # Limit to 10 columns for performance col_type = meta.get("data_type", "unknown") ai_desc = meta.get("description", "") or self._generate_ai_description(name, col_type, table) tags = meta.get("tags", []) samples = meta.get("sample_values", []) # Create column token with XML-style markers col_token = ( f"{SPECIAL_TOKENS['COLUMN_START']}{name}" f"{SPECIAL_TOKENS['TYPE_START']}{col_type}{SPECIAL_TOKENS['TYPE_END']}" f"{SPECIAL_TOKENS['DESCRIPTION_START']}{ai_desc}{SPECIAL_TOKENS['DESCRIPTION_END']}" f"{SPECIAL_TOKENS['TAGS_START']}{','.join(str(t) for t in tags)}{SPECIAL_TOKENS['TAGS_END']}" f"{SPECIAL_TOKENS['SAMPLES_START']}{','.join(str(s) for s in samples[:2])}{SPECIAL_TOKENS['SAMPLES_END']}" f"{SPECIAL_TOKENS['COLUMN_END']}" ) token_parts.append(col_token) # Add truncation indicator if more columns exist if len(column_items) > 10: truncated_count = len(column_items) - 10 token_parts.append(f"[+{truncated_count}_more_columns]") full_token = "".join(token_parts) # Log token size for monitoring logger.debug( "Generated enhanced AI token for %s: %d chars, %d columns", table, len(full_token), len(column_items), ) return full_token def _generate_ai_description(self, col_name: str, col_type: str, table: str) -> str: """Generate intelligent AI description for column based on type and context.""" # Dynamic description based on data type type_lower = col_type.lower() if col_type else "" # Type-specific descriptions if 'datetime' in type_lower or 'timestamp' in type_lower: return self._describe_datetime_column(col_name) elif 'int' in type_lower or 'long' in type_lower: return self._describe_numeric_column(col_name, 'integer') elif 'real' in type_lower or 'double' in type_lower or 'float' in type_lower: return self._describe_numeric_column(col_name, 'decimal') elif 'bool' in type_lower: return f"Boolean flag indicating {self._humanize_column_name(col_name)}" elif 'guid' in type_lower or 'uuid' in type_lower: return f"Unique identifier for {self._humanize_column_name(col_name)}" elif 'dynamic' in type_lower or 'json' in type_lower: return f"Dynamic/JSON data for {self._humanize_column_name(col_name)}" elif 'string' in type_lower or 'text' in type_lower: return self._describe_text_column(col_name) else: # Generic description return f"{self._humanize_column_name(col_name)} field" def _describe_datetime_column(self, col_name: str) -> str: """Generate description for datetime columns based on patterns.""" name_lower = col_name.lower() # Look for common datetime patterns if any(pattern in name_lower for pattern in ['created', 'creation', 'createdat']): return "Timestamp when the record was created" elif any(pattern in name_lower for pattern in ['updated', 'modified', 'updatedat']): return "Timestamp when the record was last updated" elif any(pattern in name_lower for pattern in ['start', 'begin', 'from']): return "Start time of the event or period" elif any(pattern in name_lower for pattern in ['end', 'finish', 'to', 'until']): return "End time of the event or period" elif any(pattern in name_lower for pattern in ['generated', 'logged', 'recorded']): return "Timestamp when the data was generated or logged" elif 'time' in name_lower: return f"Timestamp for {self._humanize_column_name(col_name.replace('time', '').replace('Time', ''))}" else: return f"Timestamp field: {self._humanize_column_name(col_name)}" def _describe_numeric_column(self, col_name: str, num_type: str) -> str: """Generate description for numeric columns.""" name_lower = col_name.lower() if any(pattern in name_lower for pattern in ['count', 'total', 'number']): return f"Count of {self._humanize_column_name(col_name.replace('count', '').replace('Count', ''))}" elif any(pattern in name_lower for pattern in ['size', 'length', 'bytes']): return f"Size/length measurement in {self._humanize_column_name(col_name)}" elif any(pattern in name_lower for pattern in ['duration', 'elapsed', 'latency']): return f"Time duration for {self._humanize_column_name(col_name)}" elif any(pattern in name_lower for pattern in ['score', 'rating', 'rank']): return f"Score/rating value for {self._humanize_column_name(col_name)}" elif 'id' in name_lower and num_type == 'integer': return f"Numeric identifier for {self._humanize_column_name(col_name.replace('id', '').replace('Id', ''))}" elif 'port' in name_lower: return "Network port number" elif 'code' in name_lower: return f"Numeric code for {self._humanize_column_name(col_name.replace('code', '').replace('Code', ''))}" else: return f"{num_type.capitalize()} value: {self._humanize_column_name(col_name)}" def _describe_text_column(self, col_name: str) -> str: """Generate description for text columns.""" name_lower = col_name.lower() if any(pattern in name_lower for pattern in ['name', 'title', 'label']): context = col_name.replace('name', '').replace('Name', '').replace('title', '').replace('Title', '') return f"Name/identifier for {self._humanize_column_name(context)}" if context else "Name identifier" elif any(pattern in name_lower for pattern in ['message', 'description', 'text', 'content']): return f"Descriptive text: {self._humanize_column_name(col_name)}" elif any(pattern in name_lower for pattern in ['url', 'uri', 'link', 'href']): return "URL/URI reference" elif any(pattern in name_lower for pattern in ['path', 'directory', 'folder']): return "File system path" elif any(pattern in name_lower for pattern in ['email', 'mail']): return "Email address" elif any(pattern in name_lower for pattern in ['ip', 'address']): return "Network address" elif any(pattern in name_lower for pattern in ['user', 'username', 'account']): return "User account identifier" elif any(pattern in name_lower for pattern in ['type', 'category', 'class']): return f"Type/category classification for {self._humanize_column_name(col_name)}" elif any(pattern in name_lower for pattern in ['version', 'release']): return "Version identifier" else: return f"Text field: {self._humanize_column_name(col_name)}" def _humanize_column_name(self, col_name: str) -> str: """Convert column name to human-readable format.""" if not col_name: return "data" # Handle camelCase and PascalCase import re # Insert space before uppercase letters that follow lowercase letters humanized = re.sub(r'([a-z])([A-Z])', r'\1 \2', col_name) # Insert space before uppercase letters that are followed by lowercase letters humanized = re.sub(r'([A-Z]+)([A-Z][a-z])', r'\1 \2', humanized) # Replace underscores and hyphens with spaces humanized = humanized.replace('_', ' ').replace('-', ' ') # Remove extra spaces and convert to lowercase humanized = ' '.join(humanized.split()).lower() # Remove common suffixes that don't add meaning for suffix in ['field', 'column', 'value', 'data']: if humanized.endswith(' ' + suffix): humanized = humanized[:-len(suffix)-1] return humanized.strip() or "data" def _generate_table_summary(self, table_name: str, columns) -> str: """Generate compact table summary.""" col_count = len(columns) if isinstance(columns, (list, dict)) else 0 return f"{table_name.lower()}_table_{col_count}cols" def _generate_column_tags(self, col_name: str, col_type: str) -> List[str]: """Generate smart column tags based on data type and patterns.""" tags = [] type_lower = col_type.lower() if col_type else "" # Type-based tags (primary categorization) if any(t in type_lower for t in ['datetime', 'timestamp', 'date', 'time']): tags.append("TEMPORAL") elif any(t in type_lower for t in ['int', 'long', 'short', 'byte']): tags.append("INTEGER") elif any(t in type_lower for t in ['real', 'double', 'float', 'decimal']): tags.append("DECIMAL") elif any(t in type_lower for t in ['bool', 'boolean']): tags.append("BOOLEAN") elif any(t in type_lower for t in ['guid', 'uuid']): tags.append("IDENTIFIER") elif any(t in type_lower for t in ['dynamic', 'json', 'object']): tags.append("STRUCTURED") elif any(t in type_lower for t in ['string', 'text', 'varchar', 'char']): tags.append("TEXT") # Add functional tags based on data characteristics if any(t in type_lower for t in ['datetime', 'timestamp']) and tags: tags.append("SORTABLE") if any(t in type_lower for t in ['int', 'long', 'real', 'decimal']) and tags: tags.append("AGGREGATABLE") # Pattern-based functional tags (conservative, based on common patterns) name_lower = col_name.lower() if 'id' in name_lower and any(t in tags for t in ["INTEGER", "TEXT", "IDENTIFIER"]): tags.append("KEY") return list(dict.fromkeys(tags))[:3] # Remove duplicates and limit to 3 tags def _update_ai_context_cache( self, cache_key: str, schema_data: Dict[str, Any], ai_token: str = None ): """ai_context_cache removed; maintained for backward compatibility as no-op.""" return def get_ai_context_for_tables( self, cluster_uri: str, database: str, tables: List[str] ) -> List[str]: """Get enhanced AI context tokens for tables with intelligent relevance scoring.""" try: # Ensure schemas are discovered before getting context from .utils import SchemaManager schema_manager = SchemaManager(self) for table in tables: schema = self.get_schema(cluster_uri, database, table, enable_fallback=False) if not schema or not schema.get("columns"): # Force schema discovery if missing try: import asyncio loop = asyncio.get_event_loop() if loop.is_running(): # Create task if loop is running asyncio.create_task(schema_manager.get_table_schema(cluster_uri, database, table, force_refresh=True)) else: # Run synchronously if no loop asyncio.run(schema_manager.get_table_schema(cluster_uri, database, table, force_refresh=True)) logger.debug(f"Auto-discovered schema for AI context: {table}") except Exception as discovery_error: logger.debug(f"Schema auto-discovery failed for {table}: {discovery_error}") # Use the enhanced context selector for intelligent filtering context_selector = ContextSelector() all_schemas = self._get_all_schemas_for_tables(cluster_uri, database, tables) # Select relevant context using intelligent scoring selected_tokens = context_selector.select_relevant_context("", all_schemas) logger.debug( "Generated %d context tokens using intelligent relevance scoring", len(selected_tokens) ) return selected_tokens except Exception as e: logger.warning(f"Failed to get AI context for tables: {e}") return [ f"{SPECIAL_TOKENS['TABLE_START']}{table}{SPECIAL_TOKENS['TABLE_END']}" f"{SPECIAL_TOKENS['SUMMARY_START']}context_error{SPECIAL_TOKENS['SUMMARY_END']}" for table in tables ] def _get_all_schemas_for_tables(self, cluster_uri: str, database: str, tables: List[str]) -> Dict[str, Dict]: """Get all schemas for specified tables.""" all_schemas = {} normalized_cluster = self._normalize_cluster_uri(cluster_uri) for table in tables: # Get table data from new structure cluster_data = self.corpus.get("clusters", {}).get(normalized_cluster, {}) db_data = cluster_data.get("databases", {}).get(database, {}) table_data = db_data.get("tables", {}).get(table, {}) schema_data = table_data.get("schema", {}) if schema_data and schema_data.get("columns"): all_schemas[table] = schema_data else: # Create minimal schema entry for missing tables all_schemas[table] = { "columns": {}, "ai_token": f"{SPECIAL_TOKENS['TABLE_START']}{table}{SPECIAL_TOKENS['TABLE_END']}" } return all_schemas @lru_cache(maxsize=128) def get_ai_context_for_query( self, cluster_uri: str, database: str, query: str, max_tokens: int = 3000 ) -> str: """Get AI context for a query by extracting tables and building enhanced context.""" try: from .utils import parse_query_entities entities = parse_query_entities(query) extracted_tables = entities.get("tables", []) logger.debug(f"Extracted tables for AI context: {extracted_tables}") if not extracted_tables: return "" context_tokens = self.get_ai_context_for_tables( cluster_uri, database, extracted_tables ) full_context = " ".join(context_tokens) if len(full_context) > max_tokens: truncated_tokens = [] current_length = 0 for token in context_tokens: if current_length + len(token) + 1 <= max_tokens: truncated_tokens.append(token) current_length += len(token) + 1 else: break full_context = " ".join(truncated_tokens) logger.debug(f"Truncated AI context to {len(full_context)} characters") return full_context except Exception as e: logger.warning(f"Failed to get AI context for query: {e}") return "" def _compress_token(self, token: str, max_size: int) -> Optional[str]: """Compress token to fit within size limit.""" if len(token) <= max_size: return token if max_size < 100: return None # Split token into parts parts = token.split("|") if len(parts) < 4: # Need at least cluster, database, table, summary return None # Keep essential parts essential_parts = parts[:4] # cluster, database, table, summary essential_size = sum( len(part) + 1 for part in essential_parts ) # +1 for separators if essential_size >= max_size: return None # Add columns until size limit remaining_size = max_size - essential_size column_parts = [] for part in parts[4:]: # Column parts if len(part) + 1 <= remaining_size: column_parts.append(part) remaining_size -= len(part) + 1 else: break compressed_parts = essential_parts + column_parts # Add truncation indicator if needed if len(column_parts) < len(parts) - 4: truncated_count = len(parts) - 4 - len(column_parts) truncated_part = ( f"{SPECIAL_TOKENS['COLUMN']}+{truncated_count}_more" f"{SPECIAL_TOKENS['DESCRIPTION']}truncated" ) compressed_parts.append(truncated_part) return "|".join(compressed_parts) def _normalize_cluster_uri(self, cluster_input: str) -> str: """Normalize cluster URI to standard format.""" if not cluster_input or not cluster_input.strip(): raise ValueError("Cluster input cannot be empty") cluster_input = cluster_input.strip() # If already a full HTTPS URI if cluster_input.startswith("https://"): return cluster_input # If it's a full domain name if "." in cluster_input and not cluster_input.startswith("http"): return f"https://{cluster_input}" # If it's just a cluster name if re.match(r"^[a-zA-Z0-9\-_]+$", cluster_input): return f"https://{cluster_input}.kusto.windows.net" raise ValueError(f"Invalid cluster format: {cluster_input}") def _extract_cluster_name(self, cluster_uri: str) -> str: """Extract short cluster name from URI for tokens.""" if cluster_uri.startswith("https://"): hostname = cluster_uri[8:].split(".")[0] return hostname return cluster_uri def _schedule_save(self): """Schedule background save.""" if not self._save_scheduled: self._save_scheduled = True import threading threading.Thread(target=self._background_save, daemon=True).start() def _background_save(self): """Perform background save.""" try: import time time.sleep(2.0) # Wait to batch changes self._save_scheduled = False self.save_corpus() except Exception as e: logger.error(f"Background save failed: {e}") self._save_scheduled = False def _compress_schema_data(self, schema_data: Dict[str, Any]) -> Dict[str, Any]: """Apply dynamic compression to schema data to reduce memory usage.""" if not isinstance(schema_data, dict): return schema_data compressed = schema_data.copy() # Compress sample values - keep only unique, non-empty values (max 2) if "columns" in compressed and isinstance(compressed["columns"], dict): for col_name, col_data in compressed["columns"].items(): if isinstance(col_data, dict) and "sample_values" in col_data: samples = col_data["sample_values"] if isinstance(samples, list): # Remove duplicates while preserving order, filter empty values unique_samples = [] seen = set() for sample in samples: if sample and str(sample).strip() and str(sample) not in seen: unique_samples.append(sample) seen.add(str(sample)) if len(unique_samples) >= 2: break col_data["sample_values"] = unique_samples return compressed def _should_compress_cluster_data(self, cluster_uri: str) -> bool: """Check if cluster data exceeds memory limits and needs compression.""" try: cluster_data = self.corpus.get("clusters", {}).get(cluster_uri, {}) cluster_json = json.dumps(cluster_data) cluster_size = len(cluster_json.encode('utf-8')) return cluster_size > self._memory_size_limit except Exception: return False def _compress_cluster_data(self, cluster_uri: str): """Apply compression to cluster data to reduce memory usage.""" try: cluster_data = self.corpus.get("clusters", {}).get(cluster_uri, {}) if not cluster_data: return # Remove oldest successful queries to maintain memory limits for db_name, db_data in cluster_data.get("databases", {}).items(): for table_name, table_data in db_data.get("tables", {}).items(): if isinstance(table_data, dict) and "successful_queries" in table_data: queries = table_data["successful_queries"] if isinstance(queries, list) and len(queries) > 5: # Keep only 5 most recent queries table_data["successful_queries"] = queries[-5:] # Remove old learning results if "learning_results" in cluster_data: learning_results = cluster_data["learning_results"] if isinstance(learning_results, list) and len(learning_results) > 25: cluster_data["learning_results"] = learning_results[-25:] logger.debug(f"Applied compression to cluster {cluster_uri}") except Exception as e: logger.warning(f"Failed to compress cluster data for {cluster_uri}: {e}") def save_corpus(self): """Save corpus to disk with thread safety.""" with _memory_lock: try: self.corpus["last_updated"] = datetime.now().isoformat() self.memory_path.parent.mkdir(parents=True, exist_ok=True) # Atomic save temp_path = self.memory_path.with_suffix(".tmp") with open(temp_path, "w", encoding="utf-8") as f: json.dump(self.corpus, f, indent=2, ensure_ascii=False, default=str) temp_path.replace(self.memory_path) logger.debug(f"Saved unified memory to {self.memory_path}") except Exception as e: logger.error(f"Failed to save memory: {e}") def get_memory_stats(self) -> Dict[str, Any]: """ Get comprehensive memory statistics. Returns: Dictionary with memory usage statistics """ try: corpus = self.corpus or {} clusters = corpus.get("clusters", {}) clusters_count = len(clusters) total_schemas = 0 total_queries = 0 total_tables = 0 for cluster_data in clusters.values(): if not isinstance(cluster_data, dict): continue # Count successful queries at cluster level total_queries += len(cluster_data.get("successful_queries", [])) # Count schemas and tables dbs = cluster_data.get("databases", {}) for db_data in dbs.values(): if not isinstance(db_data, dict): continue tables = db_data.get("tables", {}) total_tables += len(tables) for table_data in tables.values(): if isinstance(table_data, dict) and table_data.get("schema", {}).get("columns"): total_schemas += 1 # Count table-level successful queries total_queries += len(table_data.get("successful_queries", [])) # Calculate memory size import json corpus_json = json.dumps(corpus, default=str) memory_size_kb = len(corpus_json.encode('utf-8')) / 1024 return { "clusters_count": clusters_count, "total_schemas": total_schemas, "total_queries": total_queries, "total_tables": total_tables, "memory_size_kb": round(memory_size_kb, 2), "last_updated": corpus.get("last_updated"), "version": corpus.get("version", "3.0") } except Exception as e: logger.error(f"Failed to get memory stats: {e}") return { "error": str(e), "clusters_count": 0, "total_schemas": 0, "total_queries": 0 } def clear_memory(self) -> bool: """Clear all memory.""" try: self.corpus = self._create_empty_corpus() self.save_corpus() logger.info("Memory cleared") return True except Exception as e: logger.error(f"Failed to clear memory: {e}") return False # Global instance _memory_manager = None def get_memory_manager() -> MemoryManager: """Get global memory manager instance.""" global _memory_manager if _memory_manager is None: _memory_manager = MemoryManager() return _memory_manager def get_knowledge_corpus(): """Compatibility adapter expected by legacy tests: return an object with memory_manager.""" class KnowledgeCorpus: def __init__(self): self.memory_manager = get_memory_manager() return KnowledgeCorpus() # Convenience functions for backward compatibility and easy integration def get_context_for_tables( cluster_uri: str, database: str, tables: List[str], memory_path: Optional[str] = None, ) -> List[str]: """Get AI context tokens for tables.""" memory = get_memory_manager() return memory.get_ai_context_for_tables(cluster_uri, database, tables) def ensure_table_in_memory(cluster_uri: str, database: str, table: str) -> bool: """Ensure table schema exists in memory (simplified check).""" try: memory = get_memory_manager() schema = memory.get_schema(cluster_uri, database, table) return bool(schema) except Exception as e: logger.warning(f"Failed to check table in memory: {e}") return False def get_table_ai_token(cluster_uri: str, database: str, table: str) -> Optional[str]: """Get AI token for a specific table.""" try: memory = get_memory_manager() normalized_cluster = memory._normalize_cluster_uri(cluster_uri) # Prefer token from stored table entry cluster_data = memory.corpus.get("clusters", {}).get(normalized_cluster, {}) try: return cluster_data.get("databases", {}).get(database, {}).get("tables", {}).get(table, {}).get("ai_token") except Exception: return None except Exception as e: logger.warning(f"Failed to get AI token: {e}") return None def store_pattern_analysis( cluster_uri: str, database: str, table: str, pattern_data: Dict[str, Any] ): """Store pattern analysis data (simplified for compatibility).""" try: memory = get_memory_manager() # Store as part of schema data schema_data = memory.get_schema(cluster_uri, database, table) if schema_data: schema_data["pattern_analysis"] = pattern_data memory.store_schema(cluster_uri, database, table, schema_data) except Exception as e: logger.warning(f"Failed to store pattern analysis: {e}") def update_memory_after_query( cluster_uri: str, database: str, tables: List[str], cluster_memory_path: Optional[str] = None, ): """Update memory after query execution (placeholder for compatibility).""" logger.debug(f"Memory update triggered for {len(tables)} tables") # This function is now handled by the 2-step flow in execute_kql.py def clear_memory_cache() -> bool: """Clear memory cache.""" try: memory = get_memory_manager() return memory.clear_memory() except Exception as e: logger.error(f"Failed to clear memory cache: {e}") return False # Simplified utility functions for backward compatibility def get_dynamic_query_builder(): """Minimal query builder fallback.""" class SimpleQueryBuilder: def build_from_natural_language(self, query: str, context: dict, session_context=None): from .utils import parse_query_entities entities = parse_query_entities(query) class SimpleQuery: def __init__(self, kql_query: str): self.kql_query = kql_query def to_kql(self): return self.kql_query def get_telemetry(self): return {"confidence_score": 0.8, "method": "simple_builder"} # Use first table or default table = entities["tables"][0] if entities["tables"] else "StormEvents" return SimpleQuery(f"{table} | take 10") return SimpleQueryBuilder() def get_telemetry_collector(): """Minimal telemetry collector.""" class SimpleCollector: def track_query(self, query_id: str, query_type: str): class Tracker: def __enter__(self): return self def __exit__(self, *args): pass return Tracker() return SimpleCollector() async def kql_schema_memory_tool(natural_language_query: str = None, session_id: str = None): """Enhanced schema memory tool with forced discovery when missing.""" from .utils import parse_query_entities, SchemaManager entities = parse_query_entities(natural_language_query or "") cluster, database = entities["cluster"], entities["database"] tables = entities["tables"] if not cluster or not database: return {"error": "Missing cluster or database specification"} mm = get_memory_manager() # List tables request if any(keyword in (natural_language_query or "").lower() for keyword in ["list tables", "show tables", "what tables"]): try: schema_manager = SchemaManager() db_schema = await schema_manager.get_database_schema(cluster, database) # Force discovery even without tables - use db schema if "list tables" in (natural_language_query or "").lower(): mm.store_database_schema(cluster, database, db_schema) return {"schemas": [{"cluster": cluster, "database": database, "tables_available": db_schema.get("tables", [])}]} except Exception as e: return {"error": f"Failed to list tables: {e}"} # Table schema request table = tables[0] if tables else None if not table: return {"error": "No table specified"} schema = mm.get_schema(cluster, database, table) # Force schema discovery if missing, even on tool call if not schema or not schema.get("columns"): try: schema_manager = SchemaManager() schema = await schema_manager.get_table_schema(cluster, database, table, force_refresh=True) if schema: # Always store post-discovery to ensure persistence mm.store_schema(cluster, database, table, schema) logger.info(f"Forced schema discovery and storage for {database}.{table}") except Exception as e: logger.warning(f"Forced schema discovery failed for {table}: {e}") return {"error": f"Schema discovery failed: {e}"} # Extract columns efficiently columns = [] if isinstance(schema, dict): cols = schema.get("columns") or schema.get("column_types", {}) if isinstance(cols, dict): columns = list(cols.keys()) elif isinstance(cols, list): columns = [c if isinstance(c, str) else c.get("name", "") for c in cols] return {"schemas": [{"cluster": cluster, "database": database, "table": table, "columns": columns}]} async def get_session_context(session_id: str): """Minimal session context.""" return {"session_id": session_id, "conversation_state": "active"} def get_memory_stats() -> Dict[str, int]: """ Return basic memory statistics for tests and diagnostics. Keys provided: - clusters_count: number of clusters recorded in the corpus - total_schemas: total number of stored table schemas across all clusters/databases - total_queries: total number of stored query execution history entries """ try: mm = get_memory_manager() corpus = getattr(mm, "corpus", {}) or {} clusters = corpus.get("clusters", {}) if isinstance(corpus, dict) else {} clusters_count = len(clusters) total_schemas = 0 for cluster_data in clusters.values(): if not isinstance(cluster_data, dict): continue dbs = cluster_data.get("databases", {}) or {} for db_data in dbs.values(): if not isinstance(db_data, dict): continue tables = db_data.get("tables", {}) or {} total_schemas += len(tables) # Determine total stored successful queries across clusters. # The legacy 'query_execution_history' was removed. Derive the total # from the per-cluster 'successful_queries' lists (if populated). total_queries = 0 try: for cluster_data in clusters.values(): if not isinstance(cluster_data, dict): continue total_queries += len(cluster_data.get("successful_queries", []) or []) except Exception: total_queries = 0 return { "clusters_count": clusters_count, "total_schemas": total_schemas, "total_queries": total_queries, } except Exception as e: logger.warning(f"Failed to compute memory stats: {e}") return {"clusters_count": 0, "total_schemas": 0, "total_queries": 0}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/4R9UN/mcp-kql-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server