execute_kql.py•49.5 kB
"""
Streamlined KQL Query Execution Module
This module provides simplified KQL query execution with Azure authentication
and integrated schema management using the centralized SchemaManager.
Author: Arjun Trivedi
Email: arjuntrivedi42@yahoo.com
"""
import asyncio
import logging
import re
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from .utils import (
extract_cluster_and_database_from_query,
extract_tables_from_query,
generate_query_description,
QueryProcessor,
retry_on_exception
)
logger = logging.getLogger(__name__)
# Global QueryProcessor instance for consistent query processing
_query_processor = None
def get_query_processor():
"""Lazy load query processor to avoid circular imports."""
global _query_processor
if _query_processor is None:
try:
from .memory import get_memory_manager
memory = get_memory_manager()
_query_processor = QueryProcessor(memory)
except Exception as e:
logger.warning(f"QueryProcessor not available: {e}")
_query_processor = None
return _query_processor
# Import schema validator at module level - now from memory.py
_schema_validator = None
def get_schema_validator():
"""Lazy load schema validator to avoid circular imports."""
global _schema_validator
if _schema_validator is None:
try:
from .memory import get_memory_manager
memory = get_memory_manager()
# Schema validator is now part of MemoryManager
_schema_validator = memory
except Exception as e:
logger.warning(f"Schema validator not available: {e}")
_schema_validator = None
return _schema_validator
def classify_error_dynamically(error_message: str, status_code: Optional[int] = None) -> Dict[str, Any]:
"""
Dynamically classify errors to determine retry strategy and handling approach.
Replaces static RETRYABLE_ERROR_PATTERNS with intelligent error analysis.
Returns:
Dict with keys: is_retryable, error_category, suggested_action, retry_delay
"""
if not error_message:
return {
"is_retryable": False,
"error_category": "unknown",
"suggested_action": "investigate",
"retry_delay": 0
}
error_lower = error_message.lower()
# Network and connection errors - highly retryable
network_indicators = [
"connection", "timeout", "network", "socket", "dns", "host",
"unreachable", "refused", "reset", "aborted"
]
if any(indicator in error_lower for indicator in network_indicators):
return {
"is_retryable": True,
"error_category": "network",
"suggested_action": "retry_with_backoff",
"retry_delay": 2.0
}
# Service availability errors - retryable with longer delay
service_indicators = [
"service", "unavailable", "busy", "overload", "throttl", "rate limit",
"too many requests", "capacity", "resource exhausted"
]
if any(indicator in error_lower for indicator in service_indicators):
return {
"is_retryable": True,
"error_category": "service_availability",
"suggested_action": "retry_with_longer_delay",
"retry_delay": 5.0
}
# Authentication errors - potentially retryable if token-related
auth_indicators = ["token", "expired", "authentication", "unauthorized"]
if any(indicator in error_lower for indicator in auth_indicators):
if "expired" in error_lower or "refresh" in error_lower:
return {
"is_retryable": True,
"error_category": "auth_token",
"suggested_action": "refresh_token_and_retry",
"retry_delay": 1.0
}
else:
return {
"is_retryable": False,
"error_category": "auth_permanent",
"suggested_action": "check_credentials",
"retry_delay": 0
}
# Syntax and validation errors - not retryable
syntax_indicators = [
"syntax", "invalid", "malformed", "parse", "validation",
"bad request", "semantic", "syn0002", "sem0001"
]
if any(indicator in error_lower for indicator in syntax_indicators):
return {
"is_retryable": False,
"error_category": "syntax",
"suggested_action": "fix_query_syntax",
"retry_delay": 0
}
# Permission errors - not retryable
permission_indicators = ["forbidden", "permission", "access denied", "not authorized"]
if any(indicator in error_lower for indicator in permission_indicators):
return {
"is_retryable": False,
"error_category": "permission",
"suggested_action": "check_permissions",
"retry_delay": 0
}
# Resource not found - not retryable
notfound_indicators = ["not found", "does not exist", "missing", "unknown table", "unknown database"]
if any(indicator in error_lower for indicator in notfound_indicators):
return {
"is_retryable": False,
"error_category": "resource_not_found",
"suggested_action": "verify_resource_exists",
"retry_delay": 0
}
# Status code-based classification
if status_code:
if 500 <= status_code < 600:
return {
"is_retryable": True,
"error_category": "server_error",
"suggested_action": "retry_with_backoff",
"retry_delay": 3.0
}
elif status_code == 429:
return {
"is_retryable": True,
"error_category": "rate_limit",
"suggested_action": "retry_with_longer_delay",
"retry_delay": 10.0
}
elif 400 <= status_code < 500:
return {
"is_retryable": False,
"error_category": "client_error",
"suggested_action": "fix_request",
"retry_delay": 0
}
# Check for specific Kusto error codes
if "sem0100" in error_lower or "sem0001" in error_lower:
return {
"is_retryable": False,
"error_category": "syntax",
"suggested_action": "fix_query_syntax",
"retry_delay": 0
}
# Default classification for unknown errors - conservative retry
return {
"is_retryable": True,
"error_category": "unknown",
"suggested_action": "retry_with_caution",
"retry_delay": 2.0
}
def should_retry_error(error_message: str, status_code: Optional[int] = None) -> bool:
"""
Determine if an error should be retried using dynamic classification.
Replaces static pattern matching with intelligent error analysis.
"""
classification = classify_error_dynamically(error_message, status_code)
return classification["is_retryable"]
def clean_query_for_execution(query: str) -> str:
"""
Cleans a KQL query to prevent common syntax errors, including SEM0002.
- Strips leading/trailing whitespace.
- Returns an empty string if the query is genuinely empty or only contains whitespace.
- Handles queries that contain only comments by returning an empty string.
"""
if not query or not query.strip():
return ""
# Strip leading/trailing whitespace for clean processing.
query = query.strip()
# Handle comment-only queries.
lines = query.split('\n')
non_comment_lines = [line for line in lines if not line.strip().startswith('//')]
if not non_comment_lines:
# If all lines are comments, there's no executable query.
return ""
# Reconstruct the query from non-comment lines.
cleaned_query = '\n'.join(non_comment_lines).strip()
# Apply additional normalization only if we have content
if cleaned_query:
# Apply core syntax normalization
cleaned_query = normalize_kql_syntax(cleaned_query)
# Apply dynamic error-based fixes
cleaned_query = _apply_dynamic_fixes(cleaned_query)
return cleaned_query
def normalize_kql_syntax(query: str) -> str:
"""Optimized KQL syntax normalization with comprehensive error prevention."""
if not query:
return ""
# Normalize whitespace
query = re.sub(r'\s+', ' ', query.strip())
# Dynamic regex patterns for common KQL errors
error_patterns = [
(r'\|([a-zA-Z])', r'| \1'), # Fix pipe spacing
(r'([a-zA-Z0-9_])(==|!=|<=|>=|<|>)([a-zA-Z0-9_])', r'\1 \2 \3'), # Operator spacing
(r'\|\|+', '|'), # Double pipes
(r'\s*\|\s*', ' | '), # Pipe normalization
(r'\s+(and|or|==|!=|<=|>=|<|>)\s*$', ''), # Trailing operators
(r';\s*$', ''), # Semicolons
]
for pattern, replacement in error_patterns:
query = re.sub(pattern, replacement, query, flags=re.IGNORECASE)
# Normalize project clauses
query = re.sub(r'\|\s*project\s+([^|]+)',
lambda m: '| project ' + _normalize_project_clause(m.group(1)), query)
return query.strip()
def _apply_dynamic_fixes(query: str) -> str:
"""Apply minimal, conservative fixes to prevent SYN0002, SEM0100, and other common errors without over-processing."""
if not query or not query.strip():
return query # Return original if empty to preserve intent
original_query = query
query = query.strip()
# CONSERVATIVE APPROACH: Only fix clear syntax errors, avoid aggressive transformations
# 1. Remove trailing incomplete operators that cause SYN0002 (but only obvious cases)
# Only remove if query ends with operator and nothing else
if re.search(r'\s+(and|or)\s*$', query, re.IGNORECASE):
fixed_query = re.sub(r'\s+(and|or)\s*$', '', query, flags=re.IGNORECASE)
if fixed_query.strip(): # Only apply if result is not empty
query = fixed_query
logger.debug("Removed trailing logical operator")
# 2. Fix incomplete pipe operations - only if query literally ends with "|"
if query.rstrip().endswith('|'):
fixed_query = query.rstrip('|').strip()
if fixed_query.strip(): # Only apply if result is not empty
query = fixed_query
logger.debug("Removed trailing pipe operator")
# DON'T auto-add "| take 10" - let the user specify what they want
# 3. Fix obvious double operators (but be conservative)
if re.search(r'(==|!=|<=|>=)\s*(==|!=|<=|>=)', query):
query = re.sub(r'(==|!=|<=|>=)\s*(==|!=|<=|>=)', r'\1', query)
logger.debug("Fixed double comparison operators")
# 4. Fix malformed project clauses (only obvious syntax errors)
if re.search(r'\|\s*project\s*,', query, re.IGNORECASE):
query = re.sub(r'\|\s*project\s*,', '| project', query, flags=re.IGNORECASE)
logger.debug("Fixed project clause starting with comma")
# 5. SEM0001 fixes - join syntax (minimal fixes only)
if ' join ' in query.lower():
query = _fix_join_syntax(query)
# 6. REMOVED: Don't auto-complete incomplete expressions - let syntax validation catch them
# This was too aggressive and could break valid queries
# 7. REMOVED: Don't auto-add "| take 10" - preserve user intent
# Final safety check - if processing resulted in empty query, return original
if not query.strip():
logger.warning("Query processing resulted in empty query, returning original")
return original_query
return query
def _fix_join_syntax(query: str) -> str:
"""Fix common join syntax issues dynamically."""
# Replace 'or' with 'and' in join conditions (SEM0001 prevention)
join_pattern = re.compile(r'(\bjoin\b\s+(?:\w+\s+)?(?:\([^)]+\)\s+)?(?:\w+\s+)?on\s+)([^|]+)', re.IGNORECASE)
def fix_join_condition(match):
prefix, condition = match.groups()
# Replace 'or' with 'and' in join context
condition = re.sub(r'\bor\b', 'and', condition, flags=re.IGNORECASE)
# Ensure equality operators only
condition = re.sub(r'\b(\w+)\s*!=\s*(\w+)', r'\1 == \2', condition)
return prefix + condition
return join_pattern.sub(fix_join_condition, query)
def _normalize_project_clause(project_content: str) -> str:
"""
Enhanced normalize project clause to prevent column resolution errors.
"""
if not project_content:
return "*"
# Split on commas and clean each column
columns = []
for col in project_content.split(','):
col = col.strip()
if col:
# Remove any trailing operators or incomplete expressions
col = re.sub(r'\s+(and|or|==|!=|<=|>=|<|>)\s*$', '', col, flags=re.IGNORECASE)
if col: # Only add if still has content
# Apply bracketing for potentially problematic column names
if re.match(r'^[A-Za-z_][A-Za-z0-9_]*$', col) and not col.startswith('['):
# Check if this looks like a problematic column name
col_lower = col.lower()
problematic_patterns = {
'entityvalue', 'entitytype', 'evidencetype', 'alertname', 'alerttype',
'username', 'computername', 'processname', 'filename', 'filepath'
}
from .constants import KQL_RESERVED_WORDS
if (col_lower in {w.lower() for w in KQL_RESERVED_WORDS} or
col_lower in problematic_patterns or
re.match(r'^[A-Z][a-z]+[A-Z]', col)): # CamelCase pattern
col = f"['{col}']"
columns.append(col)
return ', '.join(columns) if columns else "*"
def validate_kql_query_syntax(query: str) -> Tuple[bool, str]:
"""
Conservative KQL query syntax validation focused on preventing critical errors.
Only validates essential syntax issues, allows more queries through.
Returns:
Tuple of (is_valid: bool, error_message: str)
"""
try:
if not query or not query.strip():
return False, "Query cannot be empty"
query_clean = query.strip()
query_lower = query_clean.lower()
# Check for management commands (permissive)
if query_lower.startswith('.'):
# Allow most management commands, only block obviously invalid ones
invalid_mgmt_patterns = ['.invalid', '.bad', '.error']
if any(pattern in query_lower for pattern in invalid_mgmt_patterns):
return False, "Invalid management command"
return True, ""
# CONSERVATIVE VALIDATION: Only check for critical syntax errors
# 1. Check for incomplete operators at the end (only obvious cases)
if re.search(r'\s+(and|or)\s*$', query_clean, re.IGNORECASE):
return False, "Query ends with incomplete logical operator"
# 2. Check for incomplete pipe operations (only if literally ends with "|")
if query_clean.rstrip().endswith('|'):
return False, "Query ends with incomplete pipe operator"
# 3. Check for double pipes (clear syntax error)
if '||' in query_clean:
return False, "Invalid double pipe operator (||) - use single pipe (|)"
# 4. RELAXED: Only check for completely empty operations (more permissive)
critical_empty_operations = [
(r'\|\s*project\s*$', "Empty project clause"),
(r'\|\s*where\s*$', "Empty where clause"),
]
for pattern, error_msg in critical_empty_operations:
if re.search(pattern, query_clean, re.IGNORECASE):
return False, f"{error_msg}"
# 5. RELAXED: Project validation (only check for obvious syntax errors)
project_match = re.search(r'\|\s*project\s+([^|]*)', query_clean, re.IGNORECASE)
if project_match:
project_content = project_match.group(1).strip()
# Only fail on completely empty project content
if not project_content:
return False, "Empty project clause"
# Only check for obvious comma errors
if re.search(r',\s*,', project_content):
return False, "Project clause has empty column between commas"
# 6. RELAXED: Don't enforce table name patterns - too restrictive
# Remove the table name validation entirely as it was too aggressive
# 7. Check for unmatched parentheses (still important)
open_parens = query_clean.count('(')
close_parens = query_clean.count(')')
if open_parens != close_parens:
return False, f"Unmatched parentheses: {open_parens} open, {close_parens} close"
# 8. Check for unmatched quotes (still important)
single_quotes = query_clean.count("'")
if single_quotes % 2 != 0:
return False, "Unmatched single quotes in query"
# 9. REMOVED: Invalid character validation was too restrictive
# 10. RELAXED: Join validation - only warn, don't fail
if 'join' in query_lower:
logger.debug(f"Query contains join operation: {query_clean[:100]}")
# 11. RELAXED: Where clause validation - only check for empty content
where_match = re.search(r'\|\s*where\s+([^|]*)', query_clean, re.IGNORECASE)
if where_match:
where_content = where_match.group(1).strip()
if not where_content:
return False, "Empty where clause"
# REMOVED: Don't fail on trailing logical operators - let KQL engine handle it
return True, ""
except Exception as e:
# Be more permissive - don't fail on validation errors
logger.warning(f"Syntax validation error: {str(e)}")
return True, "" # Allow query through if validation itself fails
def validate_query(query: str) -> Tuple[str, str]:
"""
Validate a KQL query and extract cluster and database information.
Args:
query: The KQL query to validate
Returns:
Tuple of (cluster_uri, database)
Raises:
ValueError: If query is invalid or missing required components
"""
if not query or not query.strip():
raise ValueError("Query cannot be empty")
try:
cluster, database = extract_cluster_and_database_from_query(query)
if not cluster:
raise ValueError("Query must include cluster specification")
if not database:
# According to test expectations, missing database should be treated as invalid cluster format
raise ValueError("Query must include cluster specification - invalid cluster format without database")
return cluster, database
except Exception as e:
if "cluster" in str(e).lower() or "database" in str(e).lower():
raise
raise ValueError(f"Invalid query format: {e}")
def _normalize_cluster_uri(cluster_uri: str) -> str:
"""Normalize cluster URI for connection."""
if not cluster_uri:
raise ValueError("Cluster URI cannot be None or empty")
if not cluster_uri.startswith("https://"):
cluster_uri = f"https://{cluster_uri}"
return cluster_uri.rstrip("/")
def _get_kusto_client(cluster_url: str) -> KustoClient:
"""Create and authenticate a Kusto client."""
kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(cluster_url)
return KustoClient(kcsb)
def _parse_kusto_response(response) -> pd.DataFrame:
"""Parse a Kusto response into a pandas DataFrame."""
if not response or not getattr(response, "primary_results", None):
return pd.DataFrame()
first_result = response.primary_results[0]
df = None
try:
td = first_result.to_dict()
if isinstance(td, dict) and "data" in td and td["data"] is not None:
df = pd.DataFrame(td["data"])
except Exception:
df = None
if df is None:
try:
rows = list(first_result)
cols = [c.column_name for c in getattr(first_result, "columns", []) if hasattr(c, "column_name")]
if rows and isinstance(rows[0], (list, tuple)) and cols:
df = pd.DataFrame(rows, columns=cols)
else:
df = pd.DataFrame(rows)
except Exception:
df = pd.DataFrame()
return df
@retry_on_exception()
def _execute_kusto_query_sync(kql_query: str, cluster: str, database: str, timeout: int = 300) -> pd.DataFrame:
"""
Core synchronous function to execute a KQL query against a Kusto cluster.
Adds configurable request timeout and uses retry decorator for transient failures.
"""
cluster_url = _normalize_cluster_uri(cluster)
logger.info(f"Executing KQL on {cluster_url}/{database}: {kql_query[:150]}...")
client = _get_kusto_client(cluster_url)
try:
is_mgmt_query = kql_query.strip().startswith('.')
# First execution attempt
try:
if is_mgmt_query:
response = client.execute_mgmt(database, kql_query)
else:
response = client.execute(database, kql_query)
df = _parse_kusto_response(response)
logger.debug(f"Query returned {len(df)} rows.")
try:
loop = asyncio.get_running_loop()
loop.create_task(_post_execution_learning_bg(kql_query, cluster, database, df))
except RuntimeError:
logger.debug("No event loop running - skipping background learning task")
return df
except KustoServiceError as e:
# Check if this is a retryable SEM0100-like error
classification = classify_error_dynamically(str(e))
if classification.get('is_retryable') and 'sem0100' in str(e).lower():
logger.info(f"SEM0100 error detected, attempting auto-bracketing retry: {str(e)[:100]}")
bracketed_query = bracket_suspect_identifiers(kql_query)
if bracketed_query != kql_query:
logger.debug(f"Retrying with bracketed identifiers: {bracketed_query[:150]}")
if is_mgmt_query:
response = client.execute_mgmt(database, bracketed_query)
else:
response = client.execute(database, bracketed_query)
df = _parse_kusto_response(response)
logger.info(f"SEM0100 retry successful - query returned {len(df)} rows")
return df
else:
logger.warning("Auto-bracketing did not change the query, re-raising original error")
# Re-raise the original error if not retryable or retry failed
raise
finally:
if client:
client.close()
def execute_large_query(query: str, cluster: str, database: str, chunk_size: int = 1000, timeout: int = 300) -> pd.DataFrame:
"""
Minimal query chunking helper.
- If the query already contains explicit 'take' or 'limit', execute as-is.
- Otherwise run a single timed execution (safe fallback).
This conservative approach avoids aggressive query rewriting while enabling
an explicit place to improve chunking later.
"""
if ' take ' in (query or "").lower() or ' limit ' in (query or "").lower():
return _execute_kusto_query_sync(query, cluster, database, timeout)
# Fallback: single execution with configured timeout & retries
return _execute_kusto_query_sync(query, cluster, database, timeout)
def bracket_suspect_identifiers(query: str) -> str:
"""
Enhanced auto-bracket identifiers that might cause SEM0100 resolution errors.
This function brackets:
- Reserved keywords when used as identifiers
- Identifiers starting with numbers
- Identifiers containing special characters
- Column names that commonly cause resolution failures
"""
if not query:
return query
import re
from .constants import KQL_RESERVED_WORDS
# Additional patterns that commonly cause SEM0100 errors
problematic_patterns = {
'entityvalue', 'entitytype', 'evidencetype', 'alertname', 'alerttype',
'username', 'computername', 'processname', 'filename', 'filepath',
'ipaddress', 'domainname', 'accountname', 'logontype', 'eventtype'
}
def bracket_match(match):
ident = match.group(0)
ident_lower = ident.lower()
# Skip if already in quotes or brackets
if match.start() > 0:
prev_char = query[match.start() - 1]
if prev_char in ["'", '"', '[']:
return ident
# Check if it's a reserved keyword
if ident_lower in {w.lower() for w in KQL_RESERVED_WORDS}:
return f"['{ident}']"
# Check if it's a problematic pattern
if ident_lower in problematic_patterns:
return f"['{ident}']"
# Check if it starts with a number
if re.match(r'^\d', ident):
return f"['{ident}']"
# Check if it contains special characters that need bracketing
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', ident):
return f"['{ident}']"
# Check for CamelCase patterns that might need bracketing
if re.match(r'^[A-Z][a-z]+[A-Z]', ident):
return f"['{ident}']"
return ident
# More precise regex that handles project clauses specifically
# First, handle project clauses specially
def bracket_project_columns(project_match):
project_content = project_match.group(1)
# Split columns and bracket each one
columns = []
for col in project_content.split(','):
col = col.strip()
if col and not col.startswith('[') and not col.startswith("'"):
# Apply bracketing logic to each column
if re.match(r'^[A-Za-z_][A-Za-z0-9_]*$', col):
col_lower = col.lower()
if (col_lower in {w.lower() for w in KQL_RESERVED_WORDS} or
col_lower in problematic_patterns or
re.match(r'^[A-Z][a-z]+[A-Z]', col)):
col = f"['{col}']"
columns.append(col)
else:
columns.append(col)
return f"| project {', '.join(columns)}"
# Handle project clauses first
query = re.sub(r'\|\s*project\s+([^|]+)', bracket_project_columns, query, flags=re.IGNORECASE)
# Then handle other identifiers (but avoid those already in project clauses)
# This regex is more careful to avoid double-bracketing
query = re.sub(r"(?<!['\"[\]])\b([A-Za-z_][A-Za-z0-9_]*)\b(?![\]'\"])", bracket_match, query)
return query
# Essential functions for compatibility
def validate_kql_query_advanced(query: str, cluster: str = None, database: str = None) -> Dict[str, Any]:
"""
Simplified KQL query validation.
"""
try:
if not query or not query.strip():
return {
"valid": False,
"error": "Query cannot be empty",
"suggestions": []
}
# Basic KQL syntax validation
query_lower = query.lower().strip()
# Check for management commands
if query_lower.startswith('.') and not any(cmd in query_lower for cmd in ['.show', '.list', '.help']):
return {
"valid": False,
"error": "Invalid management command",
"suggestions": ["Use .show tables or .show databases for management commands"]
}
return {
"valid": True,
"cluster": cluster,
"database": database,
"suggestions": []
}
except Exception as e:
return {
"valid": False,
"error": str(e),
"suggestions": []
}
def kql_execute_tool(kql_query: str, cluster_uri: str = None, database: str = None) -> pd.DataFrame:
"""
Enhanced KQL execution function with consolidated QueryProcessor pipeline.
"""
try:
# ENHANCED INPUT VALIDATION with detailed error messages
if not kql_query or not kql_query.strip():
logger.error("Empty query provided to kql_execute_tool")
raise ValueError("KQL query cannot be None or empty")
original_query = kql_query
# Get the QueryProcessor for consolidated processing
processor = get_query_processor()
if processor and cluster_uri and database:
try:
# Use the QueryProcessor's consolidated pipeline
logger.info("Starting consolidated query processing pipeline...")
# Handle async context properly for the processing pipeline
try:
current_loop = asyncio.get_running_loop()
# We're in an async context - skip for now to avoid conflicts
logger.info("Async context detected - using simplified processing")
clean_query = processor.clean(kql_query)
# Apply basic optimization without full async validation
if cluster_uri and database:
try:
# Get schema for optimization
from .memory import get_memory_manager
memory = get_memory_manager()
tables = extract_tables_from_query(clean_query)
if tables:
target_table = tables[0]
schema_info = memory.get_schema(cluster_uri, database, target_table, enable_fallback=False)
if schema_info:
clean_query = processor.optimize(clean_query, schema_info)
logger.info("Applied QueryProcessor optimization")
except Exception as opt_error:
logger.debug(f"Schema-based optimization failed: {opt_error}")
except RuntimeError:
# No running loop, safe to use full async pipeline
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
clean_query = loop.run_until_complete(
processor.process(kql_query, cluster_uri, database)
)
logger.info("QueryProcessor pipeline completed successfully")
finally:
loop.close()
asyncio.set_event_loop(None)
except Exception as processing_error:
logger.error(f"QueryProcessor pipeline failed: {processing_error}")
# Fallback to basic cleaning
clean_query = clean_query_for_execution(kql_query)
else:
logger.warning("QueryProcessor not available - using legacy processing")
# Fallback to legacy processing
clean_query = clean_query_for_execution(kql_query)
# Check if processing resulted in empty query
if not clean_query or not clean_query.strip():
logger.warning(f"Query processing resulted in empty query from: {original_query[:100]}")
raise ValueError("Query appears to be empty or contains only comments/whitespace")
# COMPREHENSIVE SYNTAX VALIDATION with fallback repair attempts
is_valid, validation_error = validate_kql_query_syntax(clean_query)
if not is_valid:
logger.warning(f"Syntax validation failed: {validation_error}")
# FALLBACK STRATEGY 1: Try to repair common syntax issues
logger.info("Attempting query repair...")
repaired_query = _apply_dynamic_fixes(clean_query)
# Ensure repair didn't create empty query
if not repaired_query or not repaired_query.strip():
logger.error("Query repair resulted in empty query")
raise ValueError(f"Invalid KQL syntax and repair failed: {validation_error}")
# Re-validate repaired query
is_repaired_valid, repair_validation_error = validate_kql_query_syntax(repaired_query)
if is_repaired_valid:
logger.info("Query successfully repaired")
clean_query = repaired_query
else:
logger.error(f"Query repair failed: {repair_validation_error}")
# FALLBACK STRATEGY 2: Try minimal safe query if possible
if cluster_uri and database:
logger.info("Applying minimal safe query fallback")
try:
tables = extract_tables_from_query(original_query)
if tables:
safe_query = f"{tables[0]} | take 10"
safe_valid, _ = validate_kql_query_syntax(safe_query)
if safe_valid:
clean_query = safe_query
logger.info(f"Applied safe fallback query: {safe_query}")
else:
raise ValueError(f"Cannot create safe fallback query. Original error: {validation_error}")
else:
raise ValueError(f"No tables found for fallback. Original error: {validation_error}")
except Exception as fallback_error:
logger.error(f"Safe fallback failed: {fallback_error}")
raise ValueError(f"Invalid KQL syntax and fallback failed: {validation_error}")
else:
raise ValueError(f"Invalid KQL syntax and insufficient parameters for fallback: {validation_error}")
# Check if query already contains cluster/database specification
has_cluster_spec = "cluster(" in clean_query and "database(" in clean_query
if has_cluster_spec:
# Query already has cluster/database - extract them and use the base query
try:
extracted_cluster, extracted_database = extract_cluster_and_database_from_query(clean_query)
cluster = cluster_uri or extracted_cluster
db = database or extracted_database
except Exception as extract_error:
logger.warning(f"Failed to extract cluster/database: {extract_error}")
cluster = cluster_uri
db = database
else:
# No cluster specification in query - use parameters
cluster = cluster_uri
db = database
# ENHANCED PARAMETER VALIDATION with informative errors
if not cluster:
raise ValueError("Cluster URI must be specified in query or parameters. Example: 'https://help.kusto.windows.net'")
# Check if this is a management command that doesn't require a database
is_mgmt_command = clean_query.strip().startswith('.')
mgmt_commands_no_db = ['.show databases', '.show clusters', '.help']
mgmt_needs_no_db = any(cmd in clean_query.lower() for cmd in mgmt_commands_no_db)
if not db and not (is_mgmt_command and mgmt_needs_no_db):
raise ValueError("Database must be specified in query or parameters. Example: 'Samples'")
# Final safety check for empty query before execution
if not clean_query or not clean_query.strip():
logger.error("Final query is empty after all processing")
raise ValueError("Query became empty after processing")
# Log the normalized query for debugging
if clean_query != original_query:
logger.debug(f"Query normalized from: {original_query[:100]}... to: {clean_query[:100]}...")
# Use "master" database for management commands that don't require specific database
db_for_execution = db if db else "master"
# Execute with enhanced error handling that propagates KustoServiceError
try:
return _execute_kusto_query_sync(clean_query, cluster, db_for_execution)
except KustoServiceError as e:
logger.error(f"Kusto service error during execution: {e}")
raise # Re-raise to be handled by the MCP tool
except Exception as exec_error:
logger.error(f"Generic query execution failed: {exec_error}")
# For non-Kusto errors, return an empty DataFrame to avoid crashing
return pd.DataFrame()
except Exception as e:
logger.error(f"kql_execute_tool failed pre-execution: {e}")
logger.error(f"Original query was: {kql_query if 'kql_query' in locals() else 'Unknown'}")
# Return empty DataFrame for pre-execution failures (e.g., validation)
return pd.DataFrame()
async def _post_execution_learning_bg(query: str, cluster: str, database: str, df: pd.DataFrame):
"""
Enhanced background learning task with automatic schema discovery triggering.
This runs asynchronously to avoid blocking query response.
"""
try:
# Extract table names from the executed query using the enhanced parse_query_entities
from .utils import parse_query_entities
entities = parse_query_entities(query)
tables = entities.get("tables", [])
# If no tables extracted, try fallback extraction
if not tables:
try:
# Fallback: extract table names using simpler pattern matching
table_pattern = r'\b([A-Za-z_][A-Za-z0-9_]*)\s*\|'
fallback_tables = re.findall(table_pattern, query)
if fallback_tables:
tables = fallback_tables[:1] # Take first table found
logger.debug(f"Fallback table extraction found: {tables}")
else:
# Even without table extraction, store successful query globally
logger.debug("No tables extracted but storing successful query globally")
from .memory import get_memory_manager
memory_manager = get_memory_manager()
description = generate_query_description(query)
try:
# Store in global successful queries without table association
memory_manager.add_global_successful_query(cluster, database, query, description)
logger.debug(f"Stored global successful query: {description}")
except Exception as e:
logger.debug(f"Failed to store global successful query: {e}")
return
except Exception as fallback_error:
logger.debug(f"Fallback table extraction failed: {fallback_error}")
return
# Store successful query for each table involved
from .memory import get_memory_manager
memory_manager = get_memory_manager()
description = generate_query_description(query)
for table in tables:
try:
# Add successful query to table-specific memory
memory_manager.add_successful_query(cluster, database, table, query, description)
logger.debug(f"Stored successful query for {table}: {description}")
except Exception as e:
logger.debug(f"Failed to store successful query for {table}: {e}")
# ENHANCED: Force schema discovery for all tables involved in the query
await _ensure_schema_discovered(cluster, database, tables)
except Exception as e:
logger.debug(f"Background learning task failed: {e}")
async def _ensure_schema_discovered(cluster_uri: str, database: str, tables: List[str]):
"""
Force schema discovery if not in memory.
This is the implementation recommended in the analysis.
"""
from .memory import get_memory_manager
from .utils import SchemaManager
memory = get_memory_manager()
schema_manager = SchemaManager(memory)
for table in tables:
try:
# Check if schema exists in memory
schema = memory.get_schema(cluster_uri, database, table, enable_fallback=False)
if not schema or not schema.get("columns"):
# Trigger live discovery with force refresh
logger.info(f"Auto-triggering schema discovery for {database}.{table}")
discovered_schema = await schema_manager.get_table_schema(
cluster_uri, database, table, force_refresh=True
)
if discovered_schema and discovered_schema.get("columns"):
logger.info(f"Successfully auto-discovered schema for {table} with {len(discovered_schema['columns'])} columns")
else:
logger.warning(f"Auto-discovery failed for {table} - no columns found")
else:
logger.debug(f"Schema already exists for {table}, skipping auto-discovery")
except Exception as e:
logger.warning(f"Auto schema discovery failed for {table}: {e}")
# Continue with other tables even if one fails
def get_knowledge_corpus():
"""Backward-compatible wrapper to memory.get_knowledge_corpus"""
try:
from .memory import get_knowledge_corpus as _mem_get_knowledge_corpus
return _mem_get_knowledge_corpus()
except Exception:
# Fallback mock for tests if import fails
class MockCorpus:
def get_ai_context_for_query(self, query):
return {}
return MockCorpus()
async def execute_kql_query(
query: str,
cluster: str = None,
database: str = None,
visualize: bool = False,
use_schema_context: bool = True,
timeout: int = 300
) -> Any:
"""
Legacy compatibility function for __init__.py import.
Returns a list of dictionaries (test compatibility) or dictionary with success/error status.
Enhanced with background learning integration.
Args:
query: KQL query to execute
cluster: Cluster URI (optional)
database: Database name (optional)
visualize: Whether to include visualization (ignored for now)
use_schema_context: Whether to use schema context (ignored for now)
"""
try:
# Optionally load schema context prior to execution (tests may patch get_knowledge_corpus)
if use_schema_context:
try:
corpus = get_knowledge_corpus()
# Call the method so tests can patch and assert it was invoked
_ = corpus.get_ai_context_for_query(query)
except Exception:
# Ignore failures to keep function resilient in test and runtime environments
pass
# Extract cluster and database if not provided
if not cluster or not database:
extracted_cluster, extracted_database = extract_cluster_and_database_from_query(query)
cluster = cluster or extracted_cluster
database = database or extracted_database
if not cluster or not database:
raise ValueError("Query must include cluster and database specification")
# Execute using the core sync function wrapped in asyncio.to_thread and enforce overall timeout
df = await asyncio.wait_for(
asyncio.to_thread(_execute_kusto_query_sync, query, cluster, database, timeout),
timeout=timeout + 5,
)
# Return list format for test compatibility with proper serialization
if hasattr(df, 'to_dict'):
# Convert DataFrame to serializable records
records = []
try:
for _, row in df.iterrows():
record = {}
for col, value in row.items():
if pd.isna(value):
record[col] = None
elif hasattr(value, 'isoformat'): # Timestamp objects
record[col] = value.isoformat()
elif hasattr(value, 'strftime'): # datetime objects
record[col] = value.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(value, type): # type objects
record[col] = value.__name__
elif hasattr(value, 'item'): # numpy types
record[col] = value.item()
else:
record[col] = value
records.append(record)
except Exception as e:
logger.warning(f"DataFrame serialization failed: {e}")
# Fallback to string conversion
records = df.astype(str).to_dict("records")
if visualize and records:
# Add simple visualization marker for tests
records.append({"visualization": "chart_data"})
return records
else:
return []
except Exception as e:
logger.error(f"Query execution failed: {e}")
raise # Re-raise for test compatibility
async def execute_with_full_flow(query: str, user_context: str = None) -> Dict:
"""
Implement complete execution flow with learning as recommended in the analysis.
This implements the expected flow: execute → learn → discover → refine.
"""
try:
# Step 1: Execute initial query
result = await execute_kql_query(query)
# Step 2: Extract and learn from context
if user_context:
context = await extract_context_from_prompt(user_context)
await learn_from_data(result, context)
# Step 3: Trigger background schema discovery
from .utils import parse_query_entities
entities = parse_query_entities(query)
cluster, database, tables = entities["cluster"], entities["database"], entities["tables"]
if cluster and database and tables:
asyncio.create_task(_ensure_schema_discovered(cluster, database, tables))
# Step 4: Generate enhanced query if needed (simplified for now)
enhanced_result = {
"initial_result": result,
"learning_complete": True,
"schema_discovery_triggered": bool(tables),
"entities_extracted": entities
}
return enhanced_result
except Exception as e:
logger.error(f"Full flow execution failed: {e}")
return {
"initial_result": None,
"error": str(e),
"learning_complete": False
}
async def extract_context_from_prompt(user_context: str) -> Dict:
"""Extract meaningful context from user input for learning."""
return {
"user_intent": user_context,
"needs_refinement": len(user_context.split()) > 10, # Simple heuristic
"context_type": "natural_language"
}
async def learn_from_data(result_data: Any, context: Dict):
"""Store learning results in memory for future use."""
try:
from .memory import get_memory_manager
memory = get_memory_manager()
# Convert result to learnable format
if isinstance(result_data, list) and result_data:
learning_data = {
"row_count": len(result_data),
"columns": list(result_data[0].keys()) if result_data else [],
"success": True,
"context": context
}
# Store learning result using the context info
memory.store_learning_result(
query=context.get("user_intent", ""),
result_data=learning_data,
execution_type="enhanced_flow_execution"
)
except Exception as e:
logger.warning(f"Learning from data failed: {e}")