Skip to main content
Glama

MCP KQL Server

constants.py•29.8 kB
""" Constants for the MCP KQL Server. This module contains all the constants used throughout the MCP KQL Server, including default values, configuration settings, error messages, and other static values. Author: Arjun Trivedi Email: arjuntrivedi42@yahoo.com """ import os import re from pathlib import Path from typing import Any, Dict, List # Version information __version__ = "2.0.7" VERSION = __version__ MCP_PROTOCOL_VERSION = "2024-11-05" # Server configuration SERVER_NAME = f"mcp-kql-server({__version__})" SERVER_VERSION = __version__ FEATURES = [ { "title": "Security-focused table & column pattern recognition", "summary": ( "Pattern and semantic-based detection of security-related tables/columns using " "configurable regexes/heuristics and optional ML classification. Safe defaults " "prevent logging of sensitive values." ), }, { "title": "Real-time query visualization and result formatting", "summary": ( "Progressive/streaming rendering of large result sets with configurable row/column " "limits and pluggable visualization backends to avoid large-memory buffering." ), }, { "title": "FastMCP: low-latency, high-throughput protocol", "summary": ( "Optimized implementation with connection pooling, optional compression, " "backpressure-aware streaming, and efficient serialization to minimize latency " "and memory usage." ), }, ] def _format_features_list(features: List[Dict[str, str]]) -> str: """Format features into bullet lines with safe truncation and fallbacks.""" if not features: return ( "- Security-focused table and column pattern recognition\n" "- Real-time query visualization and result formatting\n" "- FastMCP implementation for optimal performance" ) lines = [] for f in features: # Defensive conversions & truncation to avoid extremely long lines title = str(f.get("title", "<unnamed feature>"))[:120] summary = str(f.get("summary", "")).strip() if len(summary) > 300: summary = summary[:297] + "..." lines.append(f"- {title}: {summary}") return "\n".join(lines) # Build description once at import time to avoid repeated formatting at runtime. _SERVER_FEATURE_BULLETS = _format_features_list(FEATURES) SERVER_DESCRIPTION = f"""AI-Enhanced KQL Server for Cybersecurity Analytics An intelligent Model Context Protocol (MCP) server that provides advanced KQL query execution capabilities with AI-powered schema discovery and intelligent security analytics assistance. Key Features: {_SERVER_FEATURE_BULLETS} Perfect for SOC analysts, threat hunters, and security researchers working with Azure Data Explorer.""" # FastAPI Configuration FASTAPI_TITLE = "MCP KQL Local AI Agent" FASTAPI_VERSION = "2.0.7" FASTAPI_DESCRIPTION = "Local AI-powered KQL Server with no external dependencies" FASTAPI_HOST = "0.0.0.0" FASTAPI_PORT = 8000 # Tool Configuration TOOL_KQL_EXECUTE_NAME = "kql_execute" TOOL_KQL_EXECUTE_DESCRIPTION = """Execute KQL (Kusto Query Language) queries for Azure Data Explorer, Application Insights, Log Analytics, and other Kusto databases. **Use this tool when users ask about:** - KQL queries, Kusto queries, Azure Data Explorer queries - Log Analytics queries, Application Insights queries - Searching logs, events, metrics, telemetry data - Data analysis, security investigations, performance monitoring - Questions about tables like SecurityEvent, Heartbeat, Perf, Event, Syslog - Time-based queries (last hour, today, past week, etc.) - Aggregations, statistics, trends, patterns in data - Converting natural language to KQL or executing direct KQL **Examples of queries to use this tool for:** - "Show security events from last hour" - "Find failed login attempts" - "Get performance data trends" - "Search application logs for errors" - "cluster('<cluster>').database('<database>').<table> | take 10" Supports both direct KQL execution and natural language to KQL conversion with intelligent schema detection.""" TOOL_KQL_SCHEMA_NAME = "kql_schema_memory" TOOL_KQL_SCHEMA_DESCRIPTION = """Discover database schemas, table structures, and generate KQL queries using AI-powered schema intelligence. **Use this tool when users need:** - Schema discovery for Kusto/Azure Data Explorer databases - Table structure information and column details - Smart KQL query generation from natural language - Context-aware query suggestions based on available data - AI-enhanced schema analysis and pattern matching **Examples:** - "What tables are available in this database?" - "Generate a query to find security incidents" - "Help me understand the schema structure" - "Convert this natural language to KQL with proper table/column names" This tool uses local AI to provide intelligent schema discovery and KQL generation without external API dependencies.""" # KQL Validation tool removed - not implemented in current server # Default configuration values - completely data-driven approach DEFAULT_CLUSTER = None # Will be extracted from user's KQL query DEFAULT_DATABASE = None # Will be discovered from query execution DEFAULT_TABLE = None # Will be discovered from schema analysis # Default paths and directories DEFAULT_MEMORY_DIR_NAME = "KQL_MCP" DEFAULT_CLUSTER_MEMORY_DIR = "cluster_memory" SCHEMA_FILE_EXTENSION = ".json" # Azure and KQL configuration DEFAULT_KUSTO_DOMAIN = "kusto.windows.net" SYSTEM_DATABASES = {"$systemdb"} DEFAULT_CONNECTION_TIMEOUT = 60 DEFAULT_QUERY_TIMEOUT = 600 # Schema memory configuration SCHEMA_CACHE_MAX_AGE_DAYS = 7 MAX_SCHEMA_FILE_SIZE_MB = 10 MAX_TABLES_PER_DATABASE = 1000 MAX_COLUMNS_PER_TABLE = 500 # Query validation MAX_QUERY_LENGTH = 100000 MIN_QUERY_LENGTH = 10 # Default configuration (no environment variables required) DEFAULT_CONFIG = { "DEBUG_MODE": False, "AZURE_ERRORS_ONLY": True, "CONNECTION_TIMEOUT": DEFAULT_CONNECTION_TIMEOUT, "QUERY_TIMEOUT": DEFAULT_QUERY_TIMEOUT, "AUTO_CREATE_MEMORY_PATH": True, "ENABLE_LOGGING": True, } # Caching and strategy defaults (sensible fallbacks for missing configuration) CACHE_STRATEGIES = { # Maximum number of schema entries to retain in memory (LRU eviction) "SCHEMA_CACHE_SIZE": 1000, # Maximum number of pattern analysis entries to retain "PATTERN_CACHE_SIZE": 500, # Column mapping cache size "COLUMN_MAPPING_CACHE_SIZE": 500, } # Enhanced error handling configuration used by discovery/fetch routines ENHANCED_ERROR_HANDLING = { "MAX_RETRIES": 3, "BASE_RETRY_DELAY": 1.0, "RETRY_BACKOFF_FACTOR": 2.0, } # Memory optimization controls used by AI token generation and context management MEMORY_OPTIMIZATION = { "MAX_CONTEXT_SIZE": 4000, "AI_TOKEN_COMPRESSION": True, "TRIM_AI_TOKENS_TO": 1024, } # Performance tuning config (used as default values across modules) PERFORMANCE_CONFIG = { "SCHEMA_CACHE_TTL_HOURS": 24, } # Thread safety/limits defaults THREAD_SAFETY_CONFIG = { "USE_THREAD_LOCKS": True, "MAX_THREADS": 50, } # File and directory permissions FILE_PERMISSIONS = { "schema_file": 0o600, "memory_dir": 0o700, } # Limits and constraints LIMITS = { "max_concurrent_queries": 5, "max_result_rows": 10000, "max_visualization_rows": 1000, "max_column_description_length": 500, "max_table_description_length": 1000, "max_retry_attempts": 3, "retry_delay_seconds": 2, } # KQL operators that definitively indicate a KQL query KQL_OPERATORS = frozenset({ "count", "datatable", "distinct", "evaluate", "extend", "facet", "find", "getschema", "invoke", "join", "let", "limit", "lookup", "make-series", "materialize", "mv-expand", "order", "parse", "print", "project", "project-away", "project-rename", "range", "render", "sample", "sample-distinct", "sort", "summarize", "take", "top", "union", "where", }) # Natural language indicators NL_INDICATORS = { "show", "find", "what", "how", "when", "where", "who", "which", "list", "get", "can", "could", "would", "should", "tell", "give", "display", "search", "look", "help", "explain", "describe", } # Question patterns for natural language detection QUESTION_PATTERNS = [ r"\?$", r"^(what|how|when|where|who|which|why)\s+", r"^(can|could|would|should)\s+you\s+", r"^(show|find|get|list)\s+me\s+", ] # KQL syntax patterns for definitive KQL detection # Note: Use raw regex strings here (not precompiled) so callers can compile with desired flags. KQL_SYNTAX_PATTERNS = [ r"\|\s*(project|extend|where|summarize|join|find|let|evaluate|render|take|sort|order|top|count|distinct|mv-expand|make-series|parse)\b", r"ago\s*\(\s*\d+[dhms]\s*\)", r"between\s*\(.*?\.\..*?\)", r"\s(has|contains|startswith|endswith|==|=~)\s", r"^\s*let\s+", r"datatable\s*\(", r"(cluster|database)\s*\(\s*['\"].*?['\"]\s*\)", ] # Classification confidence threshold KQL_CONFIDENCE_THRESHOLD = 0.7 # Score weights for classification KQL_SYNTAX_SCORE_WEIGHT = 2.0 KQL_OPERATOR_SCORE_WEIGHT = 3.0 NL_QUESTION_SCORE_WEIGHT = 2.0 NL_INDICATOR_SCORE_WEIGHT = 1.5 KQL_TABLE_SCORE_WEIGHT = 1.0 NL_CONVERSATIONAL_SCORE_WEIGHT = 1.0 KQL_CONVERSATIONAL_PENALTY = -0.5 # Memory configuration - default memory path under %APPDATA%/KQL_MCP on Windows; fallback to user's home when not set DEFAULT_MEMORY_PATH = str(Path(os.environ.get("APPDATA", str(Path.home()))) / "KQL_MCP" / "kql_mcp_memory.json") # Query execution configuration DEFAULT_MAX_RETRIES = 3 DEFAULT_TIMEOUT = 30 DEFAULT_PREVIEW_ROWS = 100 DEFAULT_VISUALIZATION_ROWS = 10 # Agent routing configuration ROUTE_KQL = "KQL" ROUTE_NL = "NL" # Status messages STATUS_SUCCESS = "success" STATUS_ERROR = "error" STATUS_LEARNING = "learning" STATUS_HEALTHY = "healthy" STATUS_OPERATIONAL = "operational" STATUS_DEGRADED = "degraded" # Agent capabilities CAPABILITY_FAST_PATH = "Direct KQL execution" CAPABILITY_SMART_PATH = "Natural language to KQL with local AI" # Logging configuration LOG_LEVEL = "INFO" LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" # Environment variables ENV_AZURE_CORE_ONLY_SHOW_ERRORS = "AZURE_CORE_ONLY_SHOW_ERRORS" ENV_FASTMCP_QUIET = "FASTMCP_QUIET" ENV_FASTMCP_NO_BANNER = "FASTMCP_NO_BANNER" ENV_FASTMCP_SUPPRESS_BRANDING = "FASTMCP_SUPPRESS_BRANDING" ENV_FASTMCP_NO_LOGO = "FASTMCP_NO_LOGO" ENV_FASTMCP_SILENT = "FASTMCP_SILENT" ENV_NO_COLOR = "NO_COLOR" ENV_PYTHONIOENCODING = "PYTHONIOENCODING" # Note: Monitoring system removed - configurations no longer needed # Special tokens for knowledge corpus and AI-driven query chaining SPECIAL_TOKENS = { "CLUSTER_START": "<CLUSTER>", "CLUSTER_END": "</CLUSTER>", "DATABASE_START": "<DATABASE>", "DATABASE_END": "</DATABASE>", "TABLE_START": "<TABLE>", "TABLE_END": "</TABLE>", "COLUMN_START": "<COLUMN>", "COLUMN_END": "</COLUMN>", "PATTERN_START": "<PATTERN>", "PATTERN_END": "</PATTERN>", "QUERY_START": "<QUERY>", "QUERY_END": "</QUERY>", "RESULT_START": "<RESULT>", "RESULT_END": "</RESULT>", "CONTEXT_START": "<CONTEXT>", "CONTEXT_END": "</CONTEXT>", "MEMORY_START": "<MEMORY>", "MEMORY_END": "</MEMORY>", "AI_START": "<AI>", "AI_END": "</AI>", "SEPARATOR": "<SEP>", "PAD_TOKEN": "<PAD>", "UNK_TOKEN": "<UNK>", "MASK_TOKEN": "<MASK>", # Metadata extraction and context tuning tokens "METADATA_START": "<METADATA>", "METADATA_END": "</METADATA>", "EXECUTION_RESULT_START": "<EXEC_RESULT>", "EXECUTION_RESULT_END": "</EXEC_RESULT>", "PLAN_START": "<PLAN>", "PLAN_END": "</PLAN>", "CHAIN_START": "<CHAIN>", "CHAIN_END": "</CHAIN>", "FEEDBACK_START": "<FEEDBACK>", "FEEDBACK_END": "</FEEDBACK>", "LEARNING_START": "<LEARNING>", "LEARNING_END": "</LEARNING>", "OPTIMIZATION_START": "<OPTIMIZATION>", "OPTIMIZATION_END": "</OPTIMIZATION>", "SCHEMA_TRIGGER_START": "<SCHEMA_TRIGGER>", "SCHEMA_TRIGGER_END": "</SCHEMA_TRIGGER>", # Compact AI-friendly tokens (backwards-compatible aliases) "CLUSTER": "@@CLUSTER@@", "DATABASE": "##DATABASE##", "TABLE": "##TABLE##", "COLUMN": "::COLUMN::", "TYPE": ">>TYPE<<", } # Comprehensive set of KQL reserved words and keywords. KQL_RESERVED_WORDS = frozenset({ 'and', 'as', 'by', 'contains', 'count', 'distinct', 'extend', 'false', 'from', 'group', 'has', 'in', 'join', 'let', 'limit', 'not', 'on', 'or', 'order', 'project', 'sort', 'startswith', 'summarize', 'take', 'then', 'top', 'true', 'union', 'where', 'with', 'avg', 'max', 'min', 'sum', 'between', 'endswith', 'having', 'kind', 'show', 'type', 'inner', 'outer', 'anti', 'leftanti', 'getschema', 'string', 'long', 'int', 'datetime', 'timespan', 'real', 'bool', 'dynamic', 'guid', 'decimal', 'render', 'evaluate', 'parse', 'mv-expand', 'make-series', 'range', 'datatable', 'print', 'sample', 'sample-distinct', 'facet', 'find', 'invoke', 'lookup', 'materialize', 'fork', 'partition', 'serialize', 'toscalar', 'topnested', 'scan', 'search', 'externaldata', 'cluster', 'database', 'table' }) # Deprecated: Maintained for backward compatibility, use KQL_RESERVED_WORDS instead. KQL_RESERVED_KEYWORDS = list(KQL_RESERVED_WORDS) # KQL functions for AI analysis KQL_FUNCTIONS = [ "ago", "now", "datetime", "timespan", "bin", "floor", "ceiling", "round", "abs", "exp", "log", "log10", "pow", "sqrt", "sin", "cos", "tan", "strlen", "substring", "split", "strcat", "replace", "trim", "tolower", "toupper", "startswith", "endswith", "contains", "matches", "extract", "extractall", "toint", "tolong", "todouble", "tostring", "tobool", "todatetime", "totimespan", "isnull", "isnotnull", "isempty", "isnotempty", "iif", "case", "iff", "array_length", "array_slice", "array_split", "pack_array", "pack", "bag_keys", "bag_merge", "treepath", "zip", "repeat", "range", ] # KQL query patterns for AI analysis KQL_QUERY_PATTERNS = { # Matches most common filter operations, including various operators and value types. "basic_filter": re.compile(r"\|\s*where\s+\w+\s+(?:==|!=|>=|<=|>|<|contains|has|!contains|!has|startswith)\s+"), # Specifically identifies time-based filtering using the ago() function. "time_filter": re.compile(r"\|\s*where\s+\w+\s*(?:>|>=|<|<=|==|!=)\s*ago\s*\("), # Identifies aggregation, capturing both simple counts and grouped summaries with "by". "aggregation": re.compile(r"\|\s*summarize\b.*?\bby\b"), # Matches the main data shaping operators: project, project-away, and project-rename. "projection": re.compile(r"\|\s*(?:project|project-away|project-rename)\b"), # Catches calculated column creation with the "extend" operator. "extend_column": re.compile(r"\|\s*extend\b\s+\w+\s*="), # Identifies join operations, including the common "on" keyword. "join": re.compile(r"\|\s*(?:join|lookup)\b.*?\bon\b"), # Finds sorting operations. "sorting": re.compile(r"\|\s*(?:order|sort)\s+by\b"), # Matches operators that limit the number of results. "limit_results": re.compile(r"\|\s*(?:take|limit|top)\b\s+\d+"), # Specifically identifies the "top N by column" pattern. "top_by_column": re.compile(r"\|\s*top\s+\d+\s+by\b"), } # Test configuration for unit tests TEST_CONFIG = { "mock_cluster_uri": "help.kusto.windows.net", "mock_database": "Samples", "mock_table": "StormEvents" } def get_data_dir() -> Path: """Get the data directory for storing corpus and other data files.""" if os.name == "nt": # Windows base_dir = Path(os.environ.get("APPDATA", Path.home())) data_dir = base_dir / "KQL_MCP" else: # Linux/macOS base_dir = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local" / "share")) data_dir = base_dir / "mcp-kql-server" data_dir.mkdir(parents=True, exist_ok=True) return data_dir # Dynamic Schema Analysis Configuration class DynamicSchemaAnalyzer: """ Replace static patterns with dynamic analysis. This class provides methods for dynamic table and column analysis. """ @staticmethod def analyze_table_semantics(table_name: str, sample_data: List[Dict] = None) -> Dict[str, Any]: """ Dynamically analyze table purpose from actual data patterns. Replaces static keyword-based table categorization. """ if not table_name: return {"purpose": "unknown", "confidence": 0.0} # Dynamic analysis based on data patterns analysis = { "purpose": "data_table", "confidence": 0.5, "temporal_columns": [], "key_columns": [], "relationships": [] } # Analyze sample data if available if sample_data and len(sample_data) > 0: analysis.update(DynamicSchemaAnalyzer._analyze_data_patterns(sample_data)) # Table name pattern analysis (dynamic, not hardcoded keywords) analysis.update(DynamicSchemaAnalyzer._analyze_table_name_patterns(table_name)) return analysis @staticmethod def _analyze_data_patterns(sample_data: List[Dict]) -> Dict[str, Any]: """Analyze actual data patterns to determine table characteristics.""" if not sample_data: return {} patterns = { "has_timestamps": False, "has_identifiers": False, "has_metrics": False, "data_quality": "unknown" } # Check for common column patterns in data first_row = sample_data[0] if sample_data else {} for column, value in first_row.items(): if value is None: continue value_str = str(value) # Detect temporal patterns if DynamicSchemaAnalyzer._looks_like_timestamp(value_str): patterns["has_timestamps"] = True # Detect identifier patterns if DynamicSchemaAnalyzer._looks_like_identifier(value_str): patterns["has_identifiers"] = True # Detect metric patterns if DynamicSchemaAnalyzer._looks_like_metric(value_str): patterns["has_metrics"] = True return patterns @staticmethod def _analyze_table_name_patterns(table_name: str) -> Dict[str, Any]: """Analyze table name for semantic patterns without hardcoded keywords.""" name_lower = table_name.lower() # Dynamic pattern recognition patterns = { "table_type": "general", "likely_temporal": False, "likely_transactional": False } # Use pattern length and structure analysis instead of keywords if len(name_lower) > 10 and any(sep in name_lower for sep in ['_', '-']): patterns["table_type"] = "structured" # Check for timestamp-like naming patterns if any(time_indicator in name_lower for time_indicator in ['time', 'date', 'log', 'event']): patterns["likely_temporal"] = True # Check for transaction-like patterns if name_lower.endswith(('s', 'es', 'ies')): # Plural forms often indicate collections patterns["likely_transactional"] = True return patterns @staticmethod def _looks_like_timestamp(value: str) -> bool: """Dynamic timestamp detection.""" import re timestamp_patterns = [ r'\d{4}-\d{2}-\d{2}', # Date pattern r'\d{2}:\d{2}:\d{2}', # Time pattern r'^\d{10,13}$', # Unix timestamp ] return any(re.search(pattern, value) for pattern in timestamp_patterns) @staticmethod def _looks_like_identifier(value: str) -> bool: """Dynamic identifier detection.""" import re id_patterns = [ r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', # UUID r'^[0-9a-f]{32}$', # MD5 hash r'^[A-Z0-9]{8,}$', # Uppercase alphanumeric ID ] return any(re.match(pattern, value, re.IGNORECASE) for pattern in id_patterns) @staticmethod def _looks_like_metric(value: str) -> bool: """Dynamic metric detection.""" try: float(value.replace(',', '')) return True except ValueError: return False # Dynamic Column Analysis Configuration class DynamicColumnAnalyzer: """ Replace static column patterns with dynamic analysis. """ @staticmethod def generate_column_tags(column_name: str, data_type: str, sample_values: List[str] = None) -> List[str]: """ Generate smart column tags based on data type and patterns, not static keywords. Replaces the static keyword-based approach in memory.py and utils.py. """ tags = [] # Dynamic data type analysis type_lower = data_type.lower() if data_type else "" # Type-based tags (primary categorization) if any(t in type_lower for t in ['datetime', 'timestamp', 'date', 'time']): tags.append("TEMPORAL") elif any(t in type_lower for t in ['int', 'long', 'short', 'byte']): tags.append("INTEGER") elif any(t in type_lower for t in ['real', 'double', 'float', 'decimal']): tags.append("DECIMAL") elif any(t in type_lower for t in ['bool', 'boolean']): tags.append("BOOLEAN") elif any(t in type_lower for t in ['guid', 'uuid']): tags.append("IDENTIFIER") elif any(t in type_lower for t in ['dynamic', 'json', 'object']): tags.append("STRUCTURED") elif any(t in type_lower for t in ['string', 'text', 'varchar', 'char']): tags.append("TEXT") # Dynamic sample value analysis if sample_values: value_tags = DynamicColumnAnalyzer._analyze_sample_values(sample_values) tags.extend(value_tags) # Dynamic column name analysis (pattern-based, not keyword matching) name_tags = DynamicColumnAnalyzer._analyze_column_name_patterns(column_name) tags.extend(name_tags) return list(dict.fromkeys(tags))[:3] # Remove duplicates and limit to 3 tags @staticmethod def _analyze_sample_values(sample_values: List[str]) -> List[str]: """Analyze sample values to determine column characteristics.""" tags = [] if not sample_values: return tags # Check if all values are unique (likely identifier) if len(set(sample_values)) == len(sample_values): tags.append("UNIQUE_VALUES") # Check if values are categorizable (limited set) if len(set(sample_values)) <= len(sample_values) * 0.5: tags.append("CATEGORICAL") # Check for numerical patterns try: numeric_values = [float(str(v).replace(',', '')) for v in sample_values if v] if numeric_values: if all(v.is_integer() for v in numeric_values): tags.append("COUNT_LIKE") else: tags.append("MEASUREMENT_LIKE") except (ValueError, AttributeError): pass return tags @staticmethod def _analyze_column_name_patterns(column_name: str) -> List[str]: """Analyze column name patterns dynamically.""" tags = [] name_lower = column_name.lower() # Dynamic pattern analysis if name_lower.endswith('id') or name_lower.endswith('_id'): tags.append("ID_COLUMN") if 'time' in name_lower or 'date' in name_lower: tags.append("TIME_COLUMN") if name_lower.startswith('is_') or name_lower.startswith('has_'): tags.append("BOOLEAN_FLAG") if any(sep in name_lower for sep in ['_count', '_total', '_sum']): tags.append("AGGREGATION_COLUMN") return tags # Replace static patterns with dynamic factory functions def get_dynamic_table_analyzer() -> DynamicSchemaAnalyzer: """Get dynamic table analyzer instance.""" return DynamicSchemaAnalyzer() def get_dynamic_column_analyzer() -> DynamicColumnAnalyzer: """Get dynamic column analyzer instance.""" return DynamicColumnAnalyzer() # Monitoring system removed - no placeholder functions needed # Query Chaining Configuration QUERY_CHAINING_CONFIG = { "enable_query_chaining": True, "enable_background_schema_discovery": True, "enable_context_aware_planning": True, "enable_execution_result_feedback": True, "max_chain_length": 5, "chain_timeout_seconds": 300, "context_window_size": 1000, "enable_adaptive_throttling": True, } # Network Connection Configuration CONNECTION_CONFIG = { "max_retries": 5, "retry_delay": 2.0, "retry_backoff_factor": 2.0, "max_retry_delay": 60.0, "connection_timeout": 30.0, "read_timeout": 300.0, "total_timeout": 600.0, "enable_connection_pooling": True, "pool_max_size": 10, "pool_block": False, "validate_connection_before_use": True, "connection_validation_timeout": 5.0, } # Error Handling Configuration ERROR_HANDLING_CONFIG = { "enable_graceful_degradation": True, "isolate_thread_errors": True, "fallback_strategies": ["cached_schema", "query_derived_schema", "minimal_schema"], "error_recovery_timeout": 30.0, "max_consecutive_failures": 3, "failure_recovery_delay": 60.0, "enable_circuit_breaker": True, "circuit_breaker_threshold": 5, "circuit_breaker_recovery_time": 120.0, } # Retryable Error Patterns - Network and connection errors RETRYABLE_ERROR_PATTERNS = [ # Socket and connection errors r"SocketException.*?10060", # Connection timed out r"ConnectionError", r"TimeoutError", r"timeout", r"Connection refused", r"Connection reset", r"Connection aborted", r"Network is unreachable", r"Host is unreachable", # gRPC specific errors r"grpc.*?unavailable", r"grpc.*?deadline_exceeded", r"grpc.*?resource_exhausted", r"subchannel.*?connection.*?failed", r"DNS resolution failed", # Kusto/Azure specific transient errors r"ServiceNotAvailable", r"InternalServerError", r"TooManyRequests", r"ThrottlingError", r"TemporaryFailure", r"ClusterNotFound.*?temporarily", # Authentication token refresh errors r"TokenExpired", r"AuthenticationFailed.*?retry", r"Invalid.*?token.*?refresh", ] # Non-Retryable Error Patterns - Permanent failures NON_RETRYABLE_ERROR_PATTERNS = [ r"Unauthorized", r"Forbidden", r"NotFound.*?permanent", r"InvalidQuery", r"SyntaxError", r"ValidationError", r"SchemaError", r"PermissionDenied", r"QuotaExceeded.*?permanent", ] BACKGROUND_SCHEMA_CONFIG = { "enable_trigger_based_discovery": True, "discovery_trigger_conditions": { "new_table_reference": True, "unknown_column_usage": True, "query_execution_success": True, "schema_validation_failure": True, "context_gap_detected": True, }, } # Types of operations that can be orchestrated via query chaining CHAINING_OPERATION_TYPES = [ "discover_schema", "update_schema", "generate_kql", "execute_kql", "persist_memory", ] def get_query_chaining_config() -> dict: """Return the active chaining configuration (fallback to defaults).""" return QUERY_CHAINING_CONFIG def get_chaining_threshold(default: float = 0.5) -> float: """Return the chaining threshold used to decide whether to trigger chaining behavior.""" try: return float(QUERY_CHAINING_CONFIG.get("chaining_threshold", default)) except Exception: return default def is_chaining_feature_enabled(feature_name: str, config_section: str = "chaining") -> bool: config = QUERY_CHAINING_CONFIG if config_section == "chaining" else {} return bool(config.get(feature_name, False)) def should_trigger_background_schema_discovery(trigger_type: str) -> bool: if not is_chaining_feature_enabled("enable_background_schema_discovery"): return False trigger_conditions = BACKGROUND_SCHEMA_CONFIG.get("discovery_trigger_conditions", {}) return trigger_conditions.get(trigger_type, False) # Precompiled regex sets for reuse across modules (reduce repeated compilation) KQL_SYNTAX_PATTERNS_COMPILED = [re.compile(p, re.IGNORECASE) for p in KQL_SYNTAX_PATTERNS] QUESTION_PATTERNS_COMPILED = [re.compile(p, re.IGNORECASE) for p in QUESTION_PATTERNS] def is_performance_feature_enabled(feature_name: str) -> bool: """Return whether a performance-related feature is enabled (case-insensitive).""" try: return bool(PERFORMANCE_CONFIG.get(feature_name.upper(), False)) except Exception: return False # Default await timeout for async wrappers that run sync work in background threads DEFAULT_AWAIT_TIMEOUT = 30

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/4R9UN/mcp-kql-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server