Skip to main content
Glama
dstreefkerk

ms-sentinel-mcp-server

by dstreefkerk

sentinel_logs_search_with_dummy_data

Test KQL queries locally using mock data to validate syntax and logic before deployment in Microsoft Sentinel.

Instructions

Test a KQL query with mock data using a datatable. Validates KQL locally first.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
kwargsYes

Implementation Reference

  • The complete SentinelLogsSearchWithDummyDataTool class implementing the core tool logic: parses mock data (XML/CSV), validates KQL query, generates KQL datatable, rewrites query, and executes via live Sentinel query tool.
    class SentinelLogsSearchWithDummyDataTool(MCPToolBase):
        """
        Tool to test a KQL query against user-provided mock data using a datatable construct.
    
        Validates the query locally before execution. Does not touch production data, but does run
        in Azure Monitor/Sentinel for a realistic simulation.
        """
    
        name = "sentinel_logs_search_with_dummy_data"
        description = "Test a KQL query with mock data using a datatable. Validates KQL locally first."
    
        async def run(self, ctx: Context, **kwargs):
            """
            Test a KQL query against mock data using a datatable construct.
    
            Args:
                ctx (Context): The MCP context.
                **kwargs: Should include 'query', either 'mock_data_xml' or 'mock_data_csv', and optional 'table_name'.
    
            Returns:
                dict: Query results and metadata, or error information.
            """
            # Extract parameters using the centralized parameter extraction from MCPToolBase
            query = self._extract_param(kwargs, "query")
            table_name = self._extract_param(kwargs, "table_name", "TestTable")
            logger = self.logger
            
            if not query:
                logger.error("Missing required parameter: query")
                return {
                    "valid": False,
                    "errors": ["Missing required parameter: query"],
                    "error": "Missing required parameter: query",
                    "result": None,
                }
                
            # Process mock data from different formats
            mock_data = None
            
            # Option 1: XML format (preferred)
            mock_data_xml = self._extract_param(kwargs, "mock_data_xml", None)
            if mock_data_xml:
                try:
                    import xml.etree.ElementTree as ET
                    
                    # Parse XML string
                    root = ET.fromstring(mock_data_xml)
                    mock_data = []
                    
                    # Each row is a <row> element with column elements inside
                    for row_elem in root.findall('./row'):
                        row_data = {}
                        for child in row_elem:
                            tag = child.tag
                            text = child.text
                            
                            # Try to convert values to appropriate types
                            if text is not None:
                                if text.isdigit():
                                    row_data[tag] = int(text)
                                elif text.replace(".", "", 1).isdigit() and text.count(".") <= 1:
                                    row_data[tag] = float(text)
                                elif text.lower() == 'true':
                                    row_data[tag] = True
                                elif text.lower() == 'false':
                                    row_data[tag] = False
                                elif text.startswith('{') and text.endswith('}'):
                                    # Try to parse as JSON for nested objects
                                    try:
                                        row_data[tag] = json.loads(text)
                                    except json.JSONDecodeError:
                                        row_data[tag] = text
                                else:
                                    row_data[tag] = text
                            else:
                                # Handle nested structure recursively by processing child elements
                                nested_data = {}
                                for nested_child in child:
                                    nested_tag = nested_child.tag
                                    nested_text = nested_child.text
                                    
                                    if nested_text is not None:
                                        if nested_text.isdigit():
                                            nested_data[nested_tag] = int(nested_text)
                                        elif nested_text.replace(".", "", 1).isdigit() and nested_text.count(".") <= 1:
                                            nested_data[nested_tag] = float(nested_text)
                                        elif nested_text.lower() == 'true':
                                            nested_data[nested_tag] = True
                                        elif nested_text.lower() == 'false':
                                            nested_data[nested_tag] = False
                                        else:
                                            nested_data[nested_tag] = nested_text
                                
                                if nested_data:
                                    row_data[tag] = nested_data
                        
                        mock_data.append(row_data)
                except Exception as e:
                    logger.error("Failed to parse mock_data_xml: %s", e)
                    return {
                        "valid": False,
                        "errors": ["Failed to parse mock_data_xml: %s" % str(e)],
                        "error": "Failed to parse mock_data_xml: %s" % str(e),
                        "result": None,
                    }
                    
            # Option 2: CSV format (alternative)
            if not mock_data:
                mock_data_csv = self._extract_param(kwargs, "mock_data_csv", None)
                if mock_data_csv:
                    try:
                        import csv
                        from io import StringIO
                        
                        csv_reader = csv.DictReader(StringIO(mock_data_csv))
                        mock_data = list(csv_reader)
                        
                        # Convert numeric strings to numbers where appropriate
                        for row in mock_data:
                            for key, value in row.items():
                                if isinstance(value, str):
                                    # Try to convert to int or float if possible
                                    if value.isdigit():
                                        row[key] = int(value)
                                    elif value.replace(".", "", 1).isdigit() and value.count(".") <= 1:
                                        row[key] = float(value)
                                    elif value.lower() == 'true':
                                        row[key] = True
                                    elif value.lower() == 'false':
                                        row[key] = False
                    except Exception as e:
                        logger.error("Failed to parse mock_data_csv: %s", e)
                        return {
                            "valid": False, 
                            "errors": ["Failed to parse mock_data_csv: %s" % str(e)],
                            "error": "Failed to parse mock_data_csv: %s" % str(e),
                            "result": None,
                        }
            
            # If no mock data provided in any format, return helpful error
            if not mock_data:
                logger.error("Missing required mock data")
                return {
                    "valid": False,
                    "errors": ["Missing required mock data in a supported format (XML or CSV)"],
                    "error": "Please provide mock data in one of the supported formats: mock_data_xml or mock_data_csv",
                    "sample_formats": {
                        "mock_data_xml": """<rows>
        <row>
            <TimeGenerated>2023-01-01T12:00:00Z</TimeGenerated>
            <EventID>4624</EventID>
            <Computer>TestComputer</Computer>
            <Properties>
                <IpAddress>192.168.1.1</IpAddress>
                <Port>445</Port>
            </Properties>
        </row>
    </rows>""",
                        "mock_data_csv": "TimeGenerated,EventID,Computer,Account\n2023-01-01T12:00:00Z,4624,TestComputer,TestUser"
                    },
                    "result": None,
                }
            try:
                # Import inside function to avoid circular import issues
                # Importing KQLValidateTool here to avoid circular import issues
                # noqa: E402  # pylint: disable=C0415
                from tools.kql_tools import (
                    KQLValidateTool,
                )
    
                kql_validator = KQLValidateTool()
                validation_result = await kql_validator.run(ctx, query=query)
                if not validation_result.get("valid", False):
                    logger.error(
                        "KQL validation failed: %s", validation_result.get("errors")
                    )
                    return {
                        "valid": False,
                        "errors": validation_result.get("errors", []),
                        "error": validation_result.get("error", "KQL validation failed."),
                        "result": None,
                    }
            except Exception as e:
                logger.error("Local KQL validation error: %s", e, exc_info=True)
                return {
                    "valid": False,
                    "errors": ["Local KQL validation error: %s" % str(e)],
                    "error": "Local KQL validation error: %s" % str(e),
                    "result": None,
                }
            # Step 2: Generate datatable definition
            try:
                datatable_def, datatable_var = self._generate_datatable_and_let(
                    mock_data, table_name
                )
            except Exception as e:
                logger.error("Failed to generate datatable: %s", e, exc_info=True)
                return {
                    "valid": False,
                    "errors": ["Failed to generate datatable: %s" % str(e)],
                    "error": "Failed to generate datatable: %s" % str(e),
                    "result": None,
                }
            # Step 3: Rewrite query to use datatable
            try:
                # Replace all references to the original table with the datatable variable
                # Only replace whole word matches
                rewritten_query = re.sub(
                    rf"\b{re.escape(table_name)}\b", datatable_var, query
                )
                test_query = (
                    f"{datatable_def}\n\n// Original query with mock data table\n"
                    f"{rewritten_query}"
                )
            except Exception as e:
                logger.error("Failed to rewrite query: %s", e, exc_info=True)
                return {
                    "valid": False,
                    "errors": ["Failed to rewrite query: %s" % str(e)],
                    "error": "Failed to rewrite query: %s" % str(e),
                    "result": None,
                }
            # Step 4: Execute live query via SentinelLogsSearchTool
            try:
                # Import inside function to avoid circular import issues
                # Importing SentinelLogsSearchTool here to avoid circular import issues
                # noqa: E402  # pylint: disable=C0415
                from tools.query_tools import (
                    SentinelLogsSearchTool as _SentinelLogsSearchTool,
                )
    
                search_tool = _SentinelLogsSearchTool()
                result = await search_tool.run(ctx, query=test_query)
                return {
                    "valid": result.get("valid", False),
                    "errors": result.get("errors", []),
                    "error": result.get("error", ""),
                    "original_query": query,
                    "table_name": table_name,
                    "datatable_var": datatable_var,
                    "test_query": test_query,
                    "result": result,
                }
            except Exception as e:
                logger.error("Live query execution failed: %s", e, exc_info=True)
                return {
                    "valid": False,
                    "errors": ["Live query execution failed: %s" % str(e)],
                    "error": "Live query execution failed: %s" % str(e),
                    "result": None,
                }
    
        def _generate_datatable_and_let(self, mock_data: List[Dict], table_name: str):
            """
            Generate a KQL datatable definition and let binding for mock data.
    
            Args:
                mock_data (list): List of dictionaries representing mock table rows.
                table_name (str): Name for the resulting table.
    
            Returns:
                tuple: (datatable_def, datatable_var) where datatable_def
                is the KQL definition string and datatable_var is the variable name.
            """
            if not isinstance(mock_data, list) or not mock_data:
                raise ValueError("mock_data must be a non-empty list of records")
            first = mock_data[0]
    
            def kql_type(val):
                """
                Infer the KQL type for a Python value.
    
                Args:
                    val: The value to infer type for.
                Returns:
                    str: KQL type string.
                """
                if isinstance(val, bool):
                    return "bool"
                if isinstance(val, int):
                    return "long"
                if isinstance(val, float):
                    return "real"
                if isinstance(val, dict):
                    return "dynamic"
                if isinstance(val, list):
                    return "dynamic"
                if isinstance(val, str):
                    # Try to detect ISO8601 datetime
                    dt_pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}"
                    if re.match(dt_pattern, val):
                        return "datetime"
                    return "string"
                return "string"
    
            keys = list(first.keys())
            col_types = {}
            dt_pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}"
            for k in keys:
                is_datetime = False
                for row in mock_data:
                    v = row.get(k)
                    if isinstance(v, str) and re.match(dt_pattern, v):
                        is_datetime = True
                        break
                if is_datetime:
                    col_types[k] = "datetime"
                else:
                    col_types[k] = kql_type(first[k])
    
            columns = [(k, col_types[k]) for k in keys]
            col_defs = ",\n    ".join([f"{k}:{t}" for k, t in columns])
    
            def kql_repr(val, typ):
                """
                Get the KQL string representation of a value for a given type.
    
                Args:
                    val: The value.
                    typ: The KQL type.
                Returns:
                    str: KQL representation.
                """
                if typ == "string":
                    return f'"{str(val).replace("\"", "\\\"")}"'
                if typ == "datetime":
                    # If already formatted as datetime(<val>), pass through
                    if isinstance(val, str) and val.startswith("datetime("):
                        return val
                    return f"datetime({val})"
                if typ == "bool":
                    return "true" if val else "false"
                if typ in ("long", "real"):
                    return str(val)
                if typ == "dynamic":
                    return f"dynamic({json.dumps(val)})"
                return f'"{str(val)}"'
    
            row_strs = []
            for row in mock_data:
                row_vals = []
                for k, t in columns:
                    v = row.get(k)
                    if (
                        t == "datetime"
                        and isinstance(v, str)
                        and not v.startswith("datetime(")
                    ):
                        row_vals.append(f"datetime({v})")
                    else:
                        row_vals.append(kql_repr(v, t))
                row_strs.append(", ".join(row_vals))
            datatable_var = f"{table_name}Dummy"
            datatable_def = (
                f"let {datatable_var} = datatable(\n    {col_defs}\n) [\n    "
                + ",\n    ".join(row_strs)
                + f"\n];\nlet {table_name} = {datatable_var};"
            )
            return datatable_def, datatable_var
  • The register_tools function that registers the SentinelLogsSearchWithDummyDataTool with the MCP server instance.
    def register_tools(mcp: FastMCP):
        """
        Register Azure Monitor query tools with the MCP server.
    
        Args:
            mcp (FastMCP): The MCP server.
        """
        SentinelLogsSearchTool.register(mcp)
        SentinelLogsSearchWithDummyDataTool.register(mcp)
  • Private helper method within the tool class that generates the KQL 'datatable' and 'let' statements from provided mock data, inferring types and formatting values correctly for KQL.
    def _generate_datatable_and_let(self, mock_data: List[Dict], table_name: str):
        """
        Generate a KQL datatable definition and let binding for mock data.
    
        Args:
            mock_data (list): List of dictionaries representing mock table rows.
            table_name (str): Name for the resulting table.
    
        Returns:
            tuple: (datatable_def, datatable_var) where datatable_def
            is the KQL definition string and datatable_var is the variable name.
        """
        if not isinstance(mock_data, list) or not mock_data:
            raise ValueError("mock_data must be a non-empty list of records")
        first = mock_data[0]
    
        def kql_type(val):
            """
            Infer the KQL type for a Python value.
    
            Args:
                val: The value to infer type for.
            Returns:
                str: KQL type string.
            """
            if isinstance(val, bool):
                return "bool"
            if isinstance(val, int):
                return "long"
            if isinstance(val, float):
                return "real"
            if isinstance(val, dict):
                return "dynamic"
            if isinstance(val, list):
                return "dynamic"
            if isinstance(val, str):
                # Try to detect ISO8601 datetime
                dt_pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}"
                if re.match(dt_pattern, val):
                    return "datetime"
                return "string"
            return "string"
    
        keys = list(first.keys())
        col_types = {}
        dt_pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}"
        for k in keys:
            is_datetime = False
            for row in mock_data:
                v = row.get(k)
                if isinstance(v, str) and re.match(dt_pattern, v):
                    is_datetime = True
                    break
            if is_datetime:
                col_types[k] = "datetime"
            else:
                col_types[k] = kql_type(first[k])
    
        columns = [(k, col_types[k]) for k in keys]
        col_defs = ",\n    ".join([f"{k}:{t}" for k, t in columns])
    
        def kql_repr(val, typ):
            """
            Get the KQL string representation of a value for a given type.
    
            Args:
                val: The value.
                typ: The KQL type.
            Returns:
                str: KQL representation.
            """
            if typ == "string":
                return f'"{str(val).replace("\"", "\\\"")}"'
            if typ == "datetime":
                # If already formatted as datetime(<val>), pass through
                if isinstance(val, str) and val.startswith("datetime("):
                    return val
                return f"datetime({val})"
            if typ == "bool":
                return "true" if val else "false"
            if typ in ("long", "real"):
                return str(val)
            if typ == "dynamic":
                return f"dynamic({json.dumps(val)})"
            return f'"{str(val)}"'
    
        row_strs = []
        for row in mock_data:
            row_vals = []
            for k, t in columns:
                v = row.get(k)
                if (
                    t == "datetime"
                    and isinstance(v, str)
                    and not v.startswith("datetime(")
                ):
                    row_vals.append(f"datetime({v})")
                else:
                    row_vals.append(kql_repr(v, t))
            row_strs.append(", ".join(row_vals))
        datatable_var = f"{table_name}Dummy"
        datatable_def = (
            f"let {datatable_var} = datatable(\n    {col_defs}\n) [\n    "
            + ",\n    ".join(row_strs)
            + f"\n];\nlet {table_name} = {datatable_var};"
        )
        return datatable_def, datatable_var

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dstreefkerk/ms-sentinel-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server