sentinel_logs_search
Query Azure Sentinel logs using KQL to analyze security data and investigate incidents.
Instructions
Run a KQL query against Azure Monitor
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| kwargs | Yes |
Implementation Reference
- tools/query_tools.py:39-223 (handler)The main handler: async run method executes KQL queries against Azure Monitor Logs. Extracts 'query' (required) and 'timespan' (optional, e.g. '1d'), gets logs client/workspace, runs query asynchronously, processes table results into columns/rows (JSON-safe), returns structured result with execution time, warnings, errors.async def run(self, ctx: Context, **kwargs): """ Run a KQL query against Azure Monitor Logs. Args: ctx (Context): The MCP context. **kwargs: Should include 'query' and optional 'timespan'. Returns: dict: Query results and metadata, or error information. """ # Extract parameters using the centralized parameter extraction from MCPToolBase query = self._extract_param(kwargs, "query") timespan = self._extract_param(kwargs, "timespan", "1d") logger = self.logger if not query: logger.error("Missing required parameter: query") return { "valid": False, "errors": ["Missing required parameter: query"], "error": "Missing required parameter: query", "result_count": 0, "columns": [], "rows": [], "warnings": ["Missing required parameter: query"], "message": "Missing required parameter: query", } logs_client, workspace_id = self.get_logs_client_and_workspace(ctx) if logs_client is None or workspace_id is None: logger.error( "Azure Monitor Logs client or workspace_id is not initialized." ) return { "valid": False, "errors": [ ( "Azure Monitor Logs client or workspace_id is not initialized. " # noqa: E501 "Check your credentials and configuration." ) ], "error": ( "Azure Monitor Logs client or workspace_id is not initialized. " # noqa: E501 "Check your credentials and configuration." ), "result_count": 0, "columns": [], "rows": [], "warnings": [ "Azure Monitor Logs client or workspace_id is not initialized." ], "message": "Azure Monitor Logs client or workspace_id is not initialized.", } start_time = time.perf_counter() timespan_obj = None try: if timespan: if timespan.endswith("d"): timespan_obj = timedelta(days=int(timespan[:-1])) elif timespan.endswith("h"): timespan_obj = timedelta(hours=int(timespan[:-1])) elif timespan.endswith("m"): timespan_obj = timedelta(minutes=int(timespan[:-1])) else: timespan_obj = timedelta(days=1) # noqa: E501 except Exception as e: logger.error("Invalid timespan format: %s", e) return { "error": ( "Invalid timespan format: %s. Use format like '1d', '12h', or '30m'." % str(e) ), "result_count": 0, "columns": [], "rows": [], "warnings": ["Invalid timespan format: %s" % str(e)], "message": "Invalid timespan format: %s" % str(e), } warnings = [] match = re.search(r"\b(take|limit)\s+(\d+)", query, re.IGNORECASE) if match: n = int(match.group(2)) if n > 250: warnings.append( f"Large result set requested ({n} rows). " "Consider using a smaller limit for better performance." ) # noqa: E501 try: # Execute the query using task manager for async compatibility response = await run_in_thread( logs_client.query_workspace, workspace_id=workspace_id, query=query, timespan=timespan_obj, name=f"query_logs_{hash(query) % 10000}", ) exec_time_ms = int((time.perf_counter() - start_time) * 1000) def make_json_safe(val): if isinstance(val, datetime): return val.isoformat() if isinstance(val, date): return val.isoformat() return val def get_col_info(col, idx): # Azure SDK columns may have name/type/ordinal attributes, # or just be strings return { "name": getattr(col, "name", col), "type": getattr(col, "type", getattr(col, "column_type", "string")), "ordinal": getattr(col, "ordinal", idx), } if response and getattr(response, "tables", None): table = response.tables[0] columns = [ get_col_info(col, idx) for idx, col in enumerate(table.columns) ] rows = [list(row) for row in table.rows] dict_rows = [ { col["name"]: make_json_safe(cell) for col, cell in zip(columns, row) } for row in rows ] result_obj = { "valid": True, "errors": [], "query": query, "timespan": timespan, "result_count": len(dict_rows), "columns": columns, "rows": dict_rows, "execution_time_ms": exec_time_ms, "warnings": warnings, "message": "Query executed successfully", } return result_obj else: result_obj = { "valid": True, "errors": [], "query": query, "timespan": timespan, "result_count": 0, "columns": [], "rows": [], "execution_time_ms": int((time.perf_counter() - start_time) * 1000), "warnings": warnings, "message": "Query returned no tables or results", } return result_obj except TimeoutError: logger.error("Query timed out after 60 seconds") return { "valid": False, "errors": ["Query timed out after 60 seconds"], "error": "Query timed out after 60 seconds", "result_count": 0, "columns": [], "rows": [], "warnings": ["Query timed out after 60 seconds"], "message": "Query timed out after 60 seconds", } except Exception as e: logger.error("Error executing logs query: %s", str(e), exc_info=True) return { "valid": False, "errors": [f"Error executing query: {str(e)}"], "error": f"Error executing query: {str(e)}", "result_count": 0, "columns": [], "rows": [], "warnings": [f"Error executing query: {str(e)}"], "message": f"Error executing query: {str(e)}", }
- tools/query_tools.py:29-38 (schema)Tool class definition, name, and description which form the basis for input schema (expects 'query' str, optional 'timespan' str like '1d') and output (dict with valid/errors/result_count/columns/rows/etc.). Inherits MCPToolBase for standardized handling.class SentinelLogsSearchTool(MCPToolBase): """ Tool that runs a KQL query against Azure Monitor Logs. Supports both server and direct invocation (integration tests). """ name = "sentinel_logs_search" description = "Run a KQL query against Azure Monitor"
- tools/query_tools.py:586-595 (registration)The register_tools function registers SentinelLogsSearchTool (and companion dummy data tool) with the FastMCP server instance via classmethod register.def register_tools(mcp: FastMCP): """ Register Azure Monitor query tools with the MCP server. Args: mcp (FastMCP): The MCP server. """ SentinelLogsSearchTool.register(mcp) SentinelLogsSearchWithDummyDataTool.register(mcp)