Skip to main content
Glama
dstreefkerk

ms-sentinel-mcp-server

by dstreefkerk

sentinel_logs_search

Search and analyze Azure Sentinel security logs using Kusto Query Language (KQL) queries to investigate security incidents and monitor threats.

Instructions

Run a KQL query against Azure Monitor

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
kwargsYes

Implementation Reference

  • The async run method that executes the KQL query against Azure Monitor Logs using the logs client, handles parameters like query and timespan, processes results into JSON-safe format, and returns structured results or errors.
    async def run(self, ctx: Context, **kwargs):
        """
        Run a KQL query against Azure Monitor Logs.
    
        Args:
            ctx (Context): The MCP context.
            **kwargs: Should include 'query' and optional 'timespan'.
    
        Returns:
            dict: Query results and metadata, or error information.
        """
        # Extract parameters using the centralized parameter extraction from MCPToolBase
        query = self._extract_param(kwargs, "query")
        timespan = self._extract_param(kwargs, "timespan", "1d")
        logger = self.logger
        if not query:
            logger.error("Missing required parameter: query")
            return {
                "valid": False,
                "errors": ["Missing required parameter: query"],
                "error": "Missing required parameter: query",
                "result_count": 0,
                "columns": [],
                "rows": [],
                "warnings": ["Missing required parameter: query"],
                "message": "Missing required parameter: query",
            }
    
        logs_client, workspace_id = self.get_logs_client_and_workspace(ctx)
        if logs_client is None or workspace_id is None:
            logger.error(
                "Azure Monitor Logs client or workspace_id is not initialized."
            )
            return {
                "valid": False,
                "errors": [
                    (
                        "Azure Monitor Logs client or workspace_id is not initialized. "
                        # noqa: E501
                        "Check your credentials and configuration."
                    )
                ],
                "error": (
                    "Azure Monitor Logs client or workspace_id is not initialized. "
                    # noqa: E501
                    "Check your credentials and configuration."
                ),
                "result_count": 0,
                "columns": [],
                "rows": [],
                "warnings": [
                    "Azure Monitor Logs client or workspace_id is not initialized."
                ],
                "message": "Azure Monitor Logs client or workspace_id is not initialized.",
            }
    
        start_time = time.perf_counter()
        timespan_obj = None
        try:
            if timespan:
                if timespan.endswith("d"):
                    timespan_obj = timedelta(days=int(timespan[:-1]))
                elif timespan.endswith("h"):
                    timespan_obj = timedelta(hours=int(timespan[:-1]))
                elif timespan.endswith("m"):
                    timespan_obj = timedelta(minutes=int(timespan[:-1]))
                else:
                    timespan_obj = timedelta(days=1)  # noqa: E501
        except Exception as e:
            logger.error("Invalid timespan format: %s", e)
            return {
                "error": (
                    "Invalid timespan format: %s. Use format like '1d', '12h', or '30m'."
                    % str(e)
                ),
                "result_count": 0,
                "columns": [],
                "rows": [],
                "warnings": ["Invalid timespan format: %s" % str(e)],
                "message": "Invalid timespan format: %s" % str(e),
            }
    
        warnings = []
        match = re.search(r"\b(take|limit)\s+(\d+)", query, re.IGNORECASE)
        if match:
            n = int(match.group(2))
            if n > 250:
                warnings.append(
                    f"Large result set requested ({n} rows). "
                    "Consider using a smaller limit for better performance."
                )  # noqa: E501
    
        try:
            # Execute the query using task manager for async compatibility
            response = await run_in_thread(
                logs_client.query_workspace,
                workspace_id=workspace_id,
                query=query,
                timespan=timespan_obj,
                name=f"query_logs_{hash(query) % 10000}",
            )
            exec_time_ms = int((time.perf_counter() - start_time) * 1000)
    
            def make_json_safe(val):
                if isinstance(val, datetime):
                    return val.isoformat()
                if isinstance(val, date):
                    return val.isoformat()
                return val
    
            def get_col_info(col, idx):
                # Azure SDK columns may have name/type/ordinal attributes,
                # or just be strings
                return {
                    "name": getattr(col, "name", col),
                    "type": getattr(col, "type", getattr(col, "column_type", "string")),
                    "ordinal": getattr(col, "ordinal", idx),
                }
    
            if response and getattr(response, "tables", None):
                table = response.tables[0]
                columns = [
                    get_col_info(col, idx) for idx, col in enumerate(table.columns)
                ]
                rows = [list(row) for row in table.rows]
                dict_rows = [
                    {
                        col["name"]: make_json_safe(cell)
                        for col, cell in zip(columns, row)
                    }
                    for row in rows
                ]
                result_obj = {
                    "valid": True,
                    "errors": [],
                    "query": query,
                    "timespan": timespan,
                    "result_count": len(dict_rows),
                    "columns": columns,
                    "rows": dict_rows,
                    "execution_time_ms": exec_time_ms,
                    "warnings": warnings,
                    "message": "Query executed successfully",
                }
                return result_obj
            else:
                result_obj = {
                    "valid": True,
                    "errors": [],
                    "query": query,
                    "timespan": timespan,
                    "result_count": 0,
                    "columns": [],
                    "rows": [],
                    "execution_time_ms": int((time.perf_counter() - start_time) * 1000),
                    "warnings": warnings,
                    "message": "Query returned no tables or results",
                }
                return result_obj
    
        except TimeoutError:
            logger.error("Query timed out after 60 seconds")
            return {
                "valid": False,
                "errors": ["Query timed out after 60 seconds"],
                "error": "Query timed out after 60 seconds",
                "result_count": 0,
                "columns": [],
                "rows": [],
                "warnings": ["Query timed out after 60 seconds"],
                "message": "Query timed out after 60 seconds",
            }
        except Exception as e:
            logger.error("Error executing logs query: %s", str(e), exc_info=True)
            return {
                "valid": False,
                "errors": [f"Error executing query: {str(e)}"],
                "error": f"Error executing query: {str(e)}",
                "result_count": 0,
                "columns": [],
                "rows": [],
                "warnings": [f"Error executing query: {str(e)}"],
                "message": f"Error executing query: {str(e)}",
            }
  • Class-level attributes defining the tool's name and description for MCP registration and usage.
    name = "sentinel_logs_search"
    description = "Run a KQL query against Azure Monitor"
  • Function that registers the SentinelLogsSearchTool (and dummy variant) with the FastMCP server instance.
    def register_tools(mcp: FastMCP):
        """
        Register Azure Monitor query tools with the MCP server.
    
        Args:
            mcp (FastMCP): The MCP server.
        """
        SentinelLogsSearchTool.register(mcp)
        SentinelLogsSearchWithDummyDataTool.register(mcp)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dstreefkerk/ms-sentinel-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server