Skip to main content
Glama
dstreefkerk

ms-sentinel-mcp-server

by dstreefkerk
kql_validator.py6.62 kB
""" FILE: utilities/kql_validator.py DESCRIPTION: Provides offline KQL validation using the Kusto.Language library via Python.NET. """ import os from typing import List, Tuple from utilities.logging import get_tool_logger from utilities.path_utils import find_file, get_project_root logger = get_tool_logger("kql_validator") # Import pythonnet conditionally since it might not be available in all environments try: # Only import what we need import pythonnet PYTHONNET_AVAILABLE = True except ImportError: PYTHONNET_AVAILABLE = False logger.warning("pythonnet not found. Offline KQL validation will be unavailable.") class KQLValidator: """Class for validating KQL syntax without connecting to Azure.""" def __init__(self): """Initialize the KQL validator.""" self.initialized = False self.dll_path = None self.kusto_language_loaded = False # Reference to KustoCode class, to be populated during initialization self.kusto_code = None # Try to initialize if pythonnet is available if PYTHONNET_AVAILABLE: self._initialize() def _initialize(self) -> bool: """ Initialize the Kusto.Language assembly with minimal dependencies. Returns: bool: True if initialization succeeded, False otherwise. """ if self.initialized: return True # Use the path_utils to find the DLL in various locations dll_filename = "Kusto.Language.dll" search_dirs = ["lib", os.path.join("utilities", "lib"), ".", get_project_root()] self.dll_path = find_file(dll_filename, search_dirs) if not self.dll_path: logger.warning( "Kusto.Language.dll not found. Offline validation will be unavailable." ) return False logger.info("Found Kusto.Language.dll at: %s", self.dll_path) try: # Initialize Pythonnet runtime logger.info("Initializing Pythonnet runtime") pythonnet.load() logger.info("Pythonnet runtime initialized") # Use System.Reflection to load the assembly logger.info("Loading assembly from: %s", self.dll_path) try: from System import Reflection # type: ignore # pylint: disable=import-outside-toplevel # noqa: E402 except ImportError as e: logger.error("Unable to import System.Reflection: %s", e, exc_info=True) return False Reflection.Assembly.LoadFrom(self.dll_path) logger.info("Assembly loaded via System.Reflection") # Import only the essential KustoCode class for validation # This is more resilient to changes in the DLL structure try: from Kusto.Language import KustoCode # type: ignore # pylint: disable=import-outside-toplevel # noqa: E402 # Store reference to class for later use self.kusto_code = KustoCode # Test if we can parse a query test_query = "Heartbeat | take 10" test_code = KustoCode.Parse(test_query) diagnostics = list(test_code.GetDiagnostics()) logger.info( "KQL test parse successful with %d diagnostics", len(diagnostics) ) self.kusto_language_loaded = True self.initialized = True logger.info("Successfully initialized Kusto.Language library") return True except ImportError as e: logger.error( "Error importing Kusto.Language.KustoCode: %s", e, exc_info=True ) return False except Exception as e: logger.error("Error initializing Kusto.Language: %s", e, exc_info=True) return False def validate_query(self, query_text: str) -> Tuple[bool, List[str]]: """ Validate a KQL query syntax without connecting to Azure. Args: query_text: The KQL query text to validate. Returns: Tuple[bool, List[str]]: (is_valid, list_of_error_messages) """ if not PYTHONNET_AVAILABLE: return False, ["Python.NET not available. Unable to validate KQL."] if not self.initialized and not self._initialize(): return False, ["KQL validator not initialized. Cannot validate query."] try: # Parse the query using KustoCode.Parse code = self.kusto_code.Parse(query_text) # type: ignore # Get diagnostics (syntax errors) diagnostics = list(code.GetDiagnostics()) if diagnostics: # Convert diagnostics to readable error messages error_messages = [] for diag in diagnostics: # Extract line and position if available try: line_pos = ( f"Line {diag.Start.Line}, Position {diag.Start.Column}" ) except Exception: line_pos = "Unknown position" # Extract message if available try: message = diag.Message except Exception: message = str(diag) error_messages.append(f"{line_pos}: {message}") return False, error_messages return True, [] except Exception as e: return False, [f"Error validating query: {str(e)}"] # Singleton instance of the validator _VALIDATOR = None def get_validator() -> KQLValidator: """ Get the KQLValidator singleton instance. Returns: KQLValidator: The validator instance. """ global _VALIDATOR # pylint: disable=global-statement if _VALIDATOR is None: _VALIDATOR = KQLValidator() return _VALIDATOR def validate_kql(query: str) -> Tuple[bool, List[str]]: """ Validate a KQL query. Args: query: The KQL query to validate. Returns: Tuple[bool, List[str]]: (is_valid, list_of_error_messages) """ validator = get_validator() if not validator.initialized: return False, [ "KQL validation unavailable: Could not initialize validator.", "For syntax validation, please use the query tool to validate against your workspace.", ] return validator.validate_query(query)

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dstreefkerk/ms-sentinel-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server