Skip to main content
Glama
dstreefkerk

ms-sentinel-mcp-server

by dstreefkerk
query_tools.py24.2 kB
""" query_tools.py Azure Monitor query tools for the MCP server. This module provides: - SentinelLogsSearchTool: Run KQL queries against Azure Monitor Logs. - SentinelLogsSearchWithDummyDataTool: Test KQL queries with mock data using a datatable construct. All tools are MCPToolBase compliant and are designed for both server and direct invocation. """ import re import time import json from datetime import timedelta, datetime, date from typing import List, Dict from mcp.server.fastmcp import Context, FastMCP from tools.base import ( MCPToolBase, ) # May show import error in some test runners; see project memories from utilities.task_manager import ( run_in_thread, ) # May show import error in some test runners # pylint: disable=too-few-public-methods, too-many-return-statements, too-many-branches, too-many-locals class SentinelLogsSearchTool(MCPToolBase): """ Tool that runs a KQL query against Azure Monitor Logs. Supports both server and direct invocation (integration tests). """ name = "sentinel_logs_search" description = "Run a KQL query against Azure Monitor" async def run(self, ctx: Context, **kwargs): """ Run a KQL query against Azure Monitor Logs. Args: ctx (Context): The MCP context. **kwargs: Should include 'query' and optional 'timespan'. Returns: dict: Query results and metadata, or error information. """ # Extract parameters using the centralized parameter extraction from MCPToolBase query = self._extract_param(kwargs, "query") timespan = self._extract_param(kwargs, "timespan", "1d") logger = self.logger if not query: logger.error("Missing required parameter: query") return { "valid": False, "errors": ["Missing required parameter: query"], "error": "Missing required parameter: query", "result_count": 0, "columns": [], "rows": [], "warnings": ["Missing required parameter: query"], "message": "Missing required parameter: query", } logs_client, workspace_id = self.get_logs_client_and_workspace(ctx) if logs_client is None or workspace_id is None: logger.error( "Azure Monitor Logs client or workspace_id is not initialized." ) return { "valid": False, "errors": [ ( "Azure Monitor Logs client or workspace_id is not initialized. " # noqa: E501 "Check your credentials and configuration." ) ], "error": ( "Azure Monitor Logs client or workspace_id is not initialized. " # noqa: E501 "Check your credentials and configuration." ), "result_count": 0, "columns": [], "rows": [], "warnings": [ "Azure Monitor Logs client or workspace_id is not initialized." ], "message": "Azure Monitor Logs client or workspace_id is not initialized.", } start_time = time.perf_counter() timespan_obj = None try: if timespan: if timespan.endswith("d"): timespan_obj = timedelta(days=int(timespan[:-1])) elif timespan.endswith("h"): timespan_obj = timedelta(hours=int(timespan[:-1])) elif timespan.endswith("m"): timespan_obj = timedelta(minutes=int(timespan[:-1])) else: timespan_obj = timedelta(days=1) # noqa: E501 except Exception as e: logger.error("Invalid timespan format: %s", e) return { "error": ( "Invalid timespan format: %s. Use format like '1d', '12h', or '30m'." % str(e) ), "result_count": 0, "columns": [], "rows": [], "warnings": ["Invalid timespan format: %s" % str(e)], "message": "Invalid timespan format: %s" % str(e), } warnings = [] match = re.search(r"\b(take|limit)\s+(\d+)", query, re.IGNORECASE) if match: n = int(match.group(2)) if n > 250: warnings.append( f"Large result set requested ({n} rows). " "Consider using a smaller limit for better performance." ) # noqa: E501 try: # Execute the query using task manager for async compatibility response = await run_in_thread( logs_client.query_workspace, workspace_id=workspace_id, query=query, timespan=timespan_obj, name=f"query_logs_{hash(query) % 10000}", ) exec_time_ms = int((time.perf_counter() - start_time) * 1000) def make_json_safe(val): if isinstance(val, datetime): return val.isoformat() if isinstance(val, date): return val.isoformat() return val def get_col_info(col, idx): # Azure SDK columns may have name/type/ordinal attributes, # or just be strings return { "name": getattr(col, "name", col), "type": getattr(col, "type", getattr(col, "column_type", "string")), "ordinal": getattr(col, "ordinal", idx), } if response and getattr(response, "tables", None): table = response.tables[0] columns = [ get_col_info(col, idx) for idx, col in enumerate(table.columns) ] rows = [list(row) for row in table.rows] dict_rows = [ { col["name"]: make_json_safe(cell) for col, cell in zip(columns, row) } for row in rows ] result_obj = { "valid": True, "errors": [], "query": query, "timespan": timespan, "result_count": len(dict_rows), "columns": columns, "rows": dict_rows, "execution_time_ms": exec_time_ms, "warnings": warnings, "message": "Query executed successfully", } return result_obj else: result_obj = { "valid": True, "errors": [], "query": query, "timespan": timespan, "result_count": 0, "columns": [], "rows": [], "execution_time_ms": int((time.perf_counter() - start_time) * 1000), "warnings": warnings, "message": "Query returned no tables or results", } return result_obj except TimeoutError: logger.error("Query timed out after 60 seconds") return { "valid": False, "errors": ["Query timed out after 60 seconds"], "error": "Query timed out after 60 seconds", "result_count": 0, "columns": [], "rows": [], "warnings": ["Query timed out after 60 seconds"], "message": "Query timed out after 60 seconds", } except Exception as e: logger.error("Error executing logs query: %s", str(e), exc_info=True) return { "valid": False, "errors": [f"Error executing query: {str(e)}"], "error": f"Error executing query: {str(e)}", "result_count": 0, "columns": [], "rows": [], "warnings": [f"Error executing query: {str(e)}"], "message": f"Error executing query: {str(e)}", } class SentinelLogsSearchWithDummyDataTool(MCPToolBase): """ Tool to test a KQL query against user-provided mock data using a datatable construct. Validates the query locally before execution. Does not touch production data, but does run in Azure Monitor/Sentinel for a realistic simulation. """ name = "sentinel_logs_search_with_dummy_data" description = "Test a KQL query with mock data using a datatable. Validates KQL locally first." async def run(self, ctx: Context, **kwargs): """ Test a KQL query against mock data using a datatable construct. Args: ctx (Context): The MCP context. **kwargs: Should include 'query', either 'mock_data_xml' or 'mock_data_csv', and optional 'table_name'. Returns: dict: Query results and metadata, or error information. """ # Extract parameters using the centralized parameter extraction from MCPToolBase query = self._extract_param(kwargs, "query") table_name = self._extract_param(kwargs, "table_name", "TestTable") logger = self.logger if not query: logger.error("Missing required parameter: query") return { "valid": False, "errors": ["Missing required parameter: query"], "error": "Missing required parameter: query", "result": None, } # Process mock data from different formats mock_data = None # Option 1: XML format (preferred) mock_data_xml = self._extract_param(kwargs, "mock_data_xml", None) if mock_data_xml: try: import xml.etree.ElementTree as ET # Parse XML string root = ET.fromstring(mock_data_xml) mock_data = [] # Each row is a <row> element with column elements inside for row_elem in root.findall('./row'): row_data = {} for child in row_elem: tag = child.tag text = child.text # Try to convert values to appropriate types if text is not None: if text.isdigit(): row_data[tag] = int(text) elif text.replace(".", "", 1).isdigit() and text.count(".") <= 1: row_data[tag] = float(text) elif text.lower() == 'true': row_data[tag] = True elif text.lower() == 'false': row_data[tag] = False elif text.startswith('{') and text.endswith('}'): # Try to parse as JSON for nested objects try: row_data[tag] = json.loads(text) except json.JSONDecodeError: row_data[tag] = text else: row_data[tag] = text else: # Handle nested structure recursively by processing child elements nested_data = {} for nested_child in child: nested_tag = nested_child.tag nested_text = nested_child.text if nested_text is not None: if nested_text.isdigit(): nested_data[nested_tag] = int(nested_text) elif nested_text.replace(".", "", 1).isdigit() and nested_text.count(".") <= 1: nested_data[nested_tag] = float(nested_text) elif nested_text.lower() == 'true': nested_data[nested_tag] = True elif nested_text.lower() == 'false': nested_data[nested_tag] = False else: nested_data[nested_tag] = nested_text if nested_data: row_data[tag] = nested_data mock_data.append(row_data) except Exception as e: logger.error("Failed to parse mock_data_xml: %s", e) return { "valid": False, "errors": ["Failed to parse mock_data_xml: %s" % str(e)], "error": "Failed to parse mock_data_xml: %s" % str(e), "result": None, } # Option 2: CSV format (alternative) if not mock_data: mock_data_csv = self._extract_param(kwargs, "mock_data_csv", None) if mock_data_csv: try: import csv from io import StringIO csv_reader = csv.DictReader(StringIO(mock_data_csv)) mock_data = list(csv_reader) # Convert numeric strings to numbers where appropriate for row in mock_data: for key, value in row.items(): if isinstance(value, str): # Try to convert to int or float if possible if value.isdigit(): row[key] = int(value) elif value.replace(".", "", 1).isdigit() and value.count(".") <= 1: row[key] = float(value) elif value.lower() == 'true': row[key] = True elif value.lower() == 'false': row[key] = False except Exception as e: logger.error("Failed to parse mock_data_csv: %s", e) return { "valid": False, "errors": ["Failed to parse mock_data_csv: %s" % str(e)], "error": "Failed to parse mock_data_csv: %s" % str(e), "result": None, } # If no mock data provided in any format, return helpful error if not mock_data: logger.error("Missing required mock data") return { "valid": False, "errors": ["Missing required mock data in a supported format (XML or CSV)"], "error": "Please provide mock data in one of the supported formats: mock_data_xml or mock_data_csv", "sample_formats": { "mock_data_xml": """<rows> <row> <TimeGenerated>2023-01-01T12:00:00Z</TimeGenerated> <EventID>4624</EventID> <Computer>TestComputer</Computer> <Properties> <IpAddress>192.168.1.1</IpAddress> <Port>445</Port> </Properties> </row> </rows>""", "mock_data_csv": "TimeGenerated,EventID,Computer,Account\n2023-01-01T12:00:00Z,4624,TestComputer,TestUser" }, "result": None, } try: # Import inside function to avoid circular import issues # Importing KQLValidateTool here to avoid circular import issues # noqa: E402 # pylint: disable=C0415 from tools.kql_tools import ( KQLValidateTool, ) kql_validator = KQLValidateTool() validation_result = await kql_validator.run(ctx, query=query) if not validation_result.get("valid", False): logger.error( "KQL validation failed: %s", validation_result.get("errors") ) return { "valid": False, "errors": validation_result.get("errors", []), "error": validation_result.get("error", "KQL validation failed."), "result": None, } except Exception as e: logger.error("Local KQL validation error: %s", e, exc_info=True) return { "valid": False, "errors": ["Local KQL validation error: %s" % str(e)], "error": "Local KQL validation error: %s" % str(e), "result": None, } # Step 2: Generate datatable definition try: datatable_def, datatable_var = self._generate_datatable_and_let( mock_data, table_name ) except Exception as e: logger.error("Failed to generate datatable: %s", e, exc_info=True) return { "valid": False, "errors": ["Failed to generate datatable: %s" % str(e)], "error": "Failed to generate datatable: %s" % str(e), "result": None, } # Step 3: Rewrite query to use datatable try: # Replace all references to the original table with the datatable variable # Only replace whole word matches rewritten_query = re.sub( rf"\b{re.escape(table_name)}\b", datatable_var, query ) test_query = ( f"{datatable_def}\n\n// Original query with mock data table\n" f"{rewritten_query}" ) except Exception as e: logger.error("Failed to rewrite query: %s", e, exc_info=True) return { "valid": False, "errors": ["Failed to rewrite query: %s" % str(e)], "error": "Failed to rewrite query: %s" % str(e), "result": None, } # Step 4: Execute live query via SentinelLogsSearchTool try: # Import inside function to avoid circular import issues # Importing SentinelLogsSearchTool here to avoid circular import issues # noqa: E402 # pylint: disable=C0415 from tools.query_tools import ( SentinelLogsSearchTool as _SentinelLogsSearchTool, ) search_tool = _SentinelLogsSearchTool() result = await search_tool.run(ctx, query=test_query) return { "valid": result.get("valid", False), "errors": result.get("errors", []), "error": result.get("error", ""), "original_query": query, "table_name": table_name, "datatable_var": datatable_var, "test_query": test_query, "result": result, } except Exception as e: logger.error("Live query execution failed: %s", e, exc_info=True) return { "valid": False, "errors": ["Live query execution failed: %s" % str(e)], "error": "Live query execution failed: %s" % str(e), "result": None, } def _generate_datatable_and_let(self, mock_data: List[Dict], table_name: str): """ Generate a KQL datatable definition and let binding for mock data. Args: mock_data (list): List of dictionaries representing mock table rows. table_name (str): Name for the resulting table. Returns: tuple: (datatable_def, datatable_var) where datatable_def is the KQL definition string and datatable_var is the variable name. """ if not isinstance(mock_data, list) or not mock_data: raise ValueError("mock_data must be a non-empty list of records") first = mock_data[0] def kql_type(val): """ Infer the KQL type for a Python value. Args: val: The value to infer type for. Returns: str: KQL type string. """ if isinstance(val, bool): return "bool" if isinstance(val, int): return "long" if isinstance(val, float): return "real" if isinstance(val, dict): return "dynamic" if isinstance(val, list): return "dynamic" if isinstance(val, str): # Try to detect ISO8601 datetime dt_pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}" if re.match(dt_pattern, val): return "datetime" return "string" return "string" keys = list(first.keys()) col_types = {} dt_pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}" for k in keys: is_datetime = False for row in mock_data: v = row.get(k) if isinstance(v, str) and re.match(dt_pattern, v): is_datetime = True break if is_datetime: col_types[k] = "datetime" else: col_types[k] = kql_type(first[k]) columns = [(k, col_types[k]) for k in keys] col_defs = ",\n ".join([f"{k}:{t}" for k, t in columns]) def kql_repr(val, typ): """ Get the KQL string representation of a value for a given type. Args: val: The value. typ: The KQL type. Returns: str: KQL representation. """ if typ == "string": return f'"{str(val).replace("\"", "\\\"")}"' if typ == "datetime": # If already formatted as datetime(<val>), pass through if isinstance(val, str) and val.startswith("datetime("): return val return f"datetime({val})" if typ == "bool": return "true" if val else "false" if typ in ("long", "real"): return str(val) if typ == "dynamic": return f"dynamic({json.dumps(val)})" return f'"{str(val)}"' row_strs = [] for row in mock_data: row_vals = [] for k, t in columns: v = row.get(k) if ( t == "datetime" and isinstance(v, str) and not v.startswith("datetime(") ): row_vals.append(f"datetime({v})") else: row_vals.append(kql_repr(v, t)) row_strs.append(", ".join(row_vals)) datatable_var = f"{table_name}Dummy" datatable_def = ( f"let {datatable_var} = datatable(\n {col_defs}\n) [\n " + ",\n ".join(row_strs) + f"\n];\nlet {table_name} = {datatable_var};" ) return datatable_def, datatable_var def register_tools(mcp: FastMCP): """ Register Azure Monitor query tools with the MCP server. Args: mcp (FastMCP): The MCP server. """ SentinelLogsSearchTool.register(mcp) SentinelLogsSearchWithDummyDataTool.register(mcp)

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dstreefkerk/ms-sentinel-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server