Skip to main content
Glama
dstreefkerk

ms-sentinel-mcp-server

by dstreefkerk
kql_tools.py3.6 kB
""" FILE: tools/kql_tools.py Provides KQL validation and query tools for the MCP server. This module defines tools for validating KQL (Kusto Query Language) syntax in the context of the MCP server. All tools are MCPToolBase compliant and are suitable for both server and direct invocation (e.g., integration tests). """ from mcp.server.fastmcp import Context, FastMCP from tools.base import MCPToolBase from utilities.kql_validator import validate_kql class KQLValidateTool(MCPToolBase): """ Tool for validating KQL (Kusto Query Language) syntax locally. This tool checks the syntax of a provided KQL query without executing it against an Azure Log Analytics Workspace. It is intended for both MCP server use and direct invocation in integration tests. Attributes: name (str): The name of the tool. description (str): A brief description of the tool. Methods: run: Validate a KQL query and return the result. """ name = "sentinel_query_validate" description = "Validate KQL Query Syntax locally" def dummy_public_method(self): """ Dummy public method to satisfy pylint's too-few-public-methods warning. """ return None async def run(self, ctx: Context, **kwargs): """ Validate a KQL query and return the result. Args: ctx (Context): The context of the MCP server. **kwargs: Additional keyword arguments. Returns: dict: A dictionary containing the validation result. """ # Extract query using the centralized parameter extraction from MCPToolBase query = self._extract_param(kwargs, "query") logger = self.logger if not query: logger.error("Missing required parameter: query") return { "error": "Missing required parameter: query", "valid": False, "errors": ["Missing required parameter: query"], } try: is_valid, errors = validate_kql(query) if is_valid: return { "result": ( "Query validation passed. " "The KQL syntax appears to be correct." ), "valid": True, "errors": [], } error_message = "KQL validation failed:\n" + "\n".join(errors) # Warn via context if available if hasattr(ctx, "warning") and callable(getattr(ctx, "warning", None)): await ctx.warning(error_message) # Special handling for initialization error if any("KQL validation unavailable" in err for err in errors): return {"error": error_message, "valid": False, "errors": errors} return {"error": error_message, "valid": False, "errors": errors} except Exception as e: logger.error("Error validating KQL query: %s", e, exc_info=True) return { "error": ( "An error occurred while validating the query. " "Try validating code by executing a KQL query against the " "workspace instead: %s" % str(e) ), "valid": False, "errors": [str(e)], } def register_tools(mcp: FastMCP): """ Register KQL tools with the MCP server. Args: mcp (FastMCP): The MCP server instance to register tools with. """ KQLValidateTool.register(mcp)

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dstreefkerk/ms-sentinel-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server