Skip to main content
Glama
dstreefkerk

ms-sentinel-mcp-server

by dstreefkerk

sentinel_logs_search_with_dummy_data

Test KQL queries locally using mock data to validate syntax and logic before deployment in Microsoft Sentinel.

Instructions

Test a KQL query with mock data using a datatable. Validates KQL locally first.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
kwargsYes

Implementation Reference

  • The complete SentinelLogsSearchWithDummyDataTool class implementing the core tool logic: parses mock data (XML/CSV), validates KQL query, generates KQL datatable, rewrites query, and executes via live Sentinel query tool.
    class SentinelLogsSearchWithDummyDataTool(MCPToolBase):
        """
        Tool to test a KQL query against user-provided mock data using a datatable construct.
    
        Validates the query locally before execution. Does not touch production data, but does run
        in Azure Monitor/Sentinel for a realistic simulation.
        """
    
        name = "sentinel_logs_search_with_dummy_data"
        description = "Test a KQL query with mock data using a datatable. Validates KQL locally first."
    
        async def run(self, ctx: Context, **kwargs):
            """
            Test a KQL query against mock data using a datatable construct.
    
            Args:
                ctx (Context): The MCP context.
                **kwargs: Should include 'query', either 'mock_data_xml' or 'mock_data_csv', and optional 'table_name'.
    
            Returns:
                dict: Query results and metadata, or error information.
            """
            # Extract parameters using the centralized parameter extraction from MCPToolBase
            query = self._extract_param(kwargs, "query")
            table_name = self._extract_param(kwargs, "table_name", "TestTable")
            logger = self.logger
            
            if not query:
                logger.error("Missing required parameter: query")
                return {
                    "valid": False,
                    "errors": ["Missing required parameter: query"],
                    "error": "Missing required parameter: query",
                    "result": None,
                }
                
            # Process mock data from different formats
            mock_data = None
            
            # Option 1: XML format (preferred)
            mock_data_xml = self._extract_param(kwargs, "mock_data_xml", None)
            if mock_data_xml:
                try:
                    import xml.etree.ElementTree as ET
                    
                    # Parse XML string
                    root = ET.fromstring(mock_data_xml)
                    mock_data = []
                    
                    # Each row is a <row> element with column elements inside
                    for row_elem in root.findall('./row'):
                        row_data = {}
                        for child in row_elem:
                            tag = child.tag
                            text = child.text
                            
                            # Try to convert values to appropriate types
                            if text is not None:
                                if text.isdigit():
                                    row_data[tag] = int(text)
                                elif text.replace(".", "", 1).isdigit() and text.count(".") <= 1:
                                    row_data[tag] = float(text)
                                elif text.lower() == 'true':
                                    row_data[tag] = True
                                elif text.lower() == 'false':
                                    row_data[tag] = False
                                elif text.startswith('{') and text.endswith('}'):
                                    # Try to parse as JSON for nested objects
                                    try:
                                        row_data[tag] = json.loads(text)
                                    except json.JSONDecodeError:
                                        row_data[tag] = text
                                else:
                                    row_data[tag] = text
                            else:
                                # Handle nested structure recursively by processing child elements
                                nested_data = {}
                                for nested_child in child:
                                    nested_tag = nested_child.tag
                                    nested_text = nested_child.text
                                    
                                    if nested_text is not None:
                                        if nested_text.isdigit():
                                            nested_data[nested_tag] = int(nested_text)
                                        elif nested_text.replace(".", "", 1).isdigit() and nested_text.count(".") <= 1:
                                            nested_data[nested_tag] = float(nested_text)
                                        elif nested_text.lower() == 'true':
                                            nested_data[nested_tag] = True
                                        elif nested_text.lower() == 'false':
                                            nested_data[nested_tag] = False
                                        else:
                                            nested_data[nested_tag] = nested_text
                                
                                if nested_data:
                                    row_data[tag] = nested_data
                        
                        mock_data.append(row_data)
                except Exception as e:
                    logger.error("Failed to parse mock_data_xml: %s", e)
                    return {
                        "valid": False,
                        "errors": ["Failed to parse mock_data_xml: %s" % str(e)],
                        "error": "Failed to parse mock_data_xml: %s" % str(e),
                        "result": None,
                    }
                    
            # Option 2: CSV format (alternative)
            if not mock_data:
                mock_data_csv = self._extract_param(kwargs, "mock_data_csv", None)
                if mock_data_csv:
                    try:
                        import csv
                        from io import StringIO
                        
                        csv_reader = csv.DictReader(StringIO(mock_data_csv))
                        mock_data = list(csv_reader)
                        
                        # Convert numeric strings to numbers where appropriate
                        for row in mock_data:
                            for key, value in row.items():
                                if isinstance(value, str):
                                    # Try to convert to int or float if possible
                                    if value.isdigit():
                                        row[key] = int(value)
                                    elif value.replace(".", "", 1).isdigit() and value.count(".") <= 1:
                                        row[key] = float(value)
                                    elif value.lower() == 'true':
                                        row[key] = True
                                    elif value.lower() == 'false':
                                        row[key] = False
                    except Exception as e:
                        logger.error("Failed to parse mock_data_csv: %s", e)
                        return {
                            "valid": False, 
                            "errors": ["Failed to parse mock_data_csv: %s" % str(e)],
                            "error": "Failed to parse mock_data_csv: %s" % str(e),
                            "result": None,
                        }
            
            # If no mock data provided in any format, return helpful error
            if not mock_data:
                logger.error("Missing required mock data")
                return {
                    "valid": False,
                    "errors": ["Missing required mock data in a supported format (XML or CSV)"],
                    "error": "Please provide mock data in one of the supported formats: mock_data_xml or mock_data_csv",
                    "sample_formats": {
                        "mock_data_xml": """<rows>
        <row>
            <TimeGenerated>2023-01-01T12:00:00Z</TimeGenerated>
            <EventID>4624</EventID>
            <Computer>TestComputer</Computer>
            <Properties>
                <IpAddress>192.168.1.1</IpAddress>
                <Port>445</Port>
            </Properties>
        </row>
    </rows>""",
                        "mock_data_csv": "TimeGenerated,EventID,Computer,Account\n2023-01-01T12:00:00Z,4624,TestComputer,TestUser"
                    },
                    "result": None,
                }
            try:
                # Import inside function to avoid circular import issues
                # Importing KQLValidateTool here to avoid circular import issues
                # noqa: E402  # pylint: disable=C0415
                from tools.kql_tools import (
                    KQLValidateTool,
                )
    
                kql_validator = KQLValidateTool()
                validation_result = await kql_validator.run(ctx, query=query)
                if not validation_result.get("valid", False):
                    logger.error(
                        "KQL validation failed: %s", validation_result.get("errors")
                    )
                    return {
                        "valid": False,
                        "errors": validation_result.get("errors", []),
                        "error": validation_result.get("error", "KQL validation failed."),
                        "result": None,
                    }
            except Exception as e:
                logger.error("Local KQL validation error: %s", e, exc_info=True)
                return {
                    "valid": False,
                    "errors": ["Local KQL validation error: %s" % str(e)],
                    "error": "Local KQL validation error: %s" % str(e),
                    "result": None,
                }
            # Step 2: Generate datatable definition
            try:
                datatable_def, datatable_var = self._generate_datatable_and_let(
                    mock_data, table_name
                )
            except Exception as e:
                logger.error("Failed to generate datatable: %s", e, exc_info=True)
                return {
                    "valid": False,
                    "errors": ["Failed to generate datatable: %s" % str(e)],
                    "error": "Failed to generate datatable: %s" % str(e),
                    "result": None,
                }
            # Step 3: Rewrite query to use datatable
            try:
                # Replace all references to the original table with the datatable variable
                # Only replace whole word matches
                rewritten_query = re.sub(
                    rf"\b{re.escape(table_name)}\b", datatable_var, query
                )
                test_query = (
                    f"{datatable_def}\n\n// Original query with mock data table\n"
                    f"{rewritten_query}"
                )
            except Exception as e:
                logger.error("Failed to rewrite query: %s", e, exc_info=True)
                return {
                    "valid": False,
                    "errors": ["Failed to rewrite query: %s" % str(e)],
                    "error": "Failed to rewrite query: %s" % str(e),
                    "result": None,
                }
            # Step 4: Execute live query via SentinelLogsSearchTool
            try:
                # Import inside function to avoid circular import issues
                # Importing SentinelLogsSearchTool here to avoid circular import issues
                # noqa: E402  # pylint: disable=C0415
                from tools.query_tools import (
                    SentinelLogsSearchTool as _SentinelLogsSearchTool,
                )
    
                search_tool = _SentinelLogsSearchTool()
                result = await search_tool.run(ctx, query=test_query)
                return {
                    "valid": result.get("valid", False),
                    "errors": result.get("errors", []),
                    "error": result.get("error", ""),
                    "original_query": query,
                    "table_name": table_name,
                    "datatable_var": datatable_var,
                    "test_query": test_query,
                    "result": result,
                }
            except Exception as e:
                logger.error("Live query execution failed: %s", e, exc_info=True)
                return {
                    "valid": False,
                    "errors": ["Live query execution failed: %s" % str(e)],
                    "error": "Live query execution failed: %s" % str(e),
                    "result": None,
                }
    
        def _generate_datatable_and_let(self, mock_data: List[Dict], table_name: str):
            """
            Generate a KQL datatable definition and let binding for mock data.
    
            Args:
                mock_data (list): List of dictionaries representing mock table rows.
                table_name (str): Name for the resulting table.
    
            Returns:
                tuple: (datatable_def, datatable_var) where datatable_def
                is the KQL definition string and datatable_var is the variable name.
            """
            if not isinstance(mock_data, list) or not mock_data:
                raise ValueError("mock_data must be a non-empty list of records")
            first = mock_data[0]
    
            def kql_type(val):
                """
                Infer the KQL type for a Python value.
    
                Args:
                    val: The value to infer type for.
                Returns:
                    str: KQL type string.
                """
                if isinstance(val, bool):
                    return "bool"
                if isinstance(val, int):
                    return "long"
                if isinstance(val, float):
                    return "real"
                if isinstance(val, dict):
                    return "dynamic"
                if isinstance(val, list):
                    return "dynamic"
                if isinstance(val, str):
                    # Try to detect ISO8601 datetime
                    dt_pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}"
                    if re.match(dt_pattern, val):
                        return "datetime"
                    return "string"
                return "string"
    
            keys = list(first.keys())
            col_types = {}
            dt_pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}"
            for k in keys:
                is_datetime = False
                for row in mock_data:
                    v = row.get(k)
                    if isinstance(v, str) and re.match(dt_pattern, v):
                        is_datetime = True
                        break
                if is_datetime:
                    col_types[k] = "datetime"
                else:
                    col_types[k] = kql_type(first[k])
    
            columns = [(k, col_types[k]) for k in keys]
            col_defs = ",\n    ".join([f"{k}:{t}" for k, t in columns])
    
            def kql_repr(val, typ):
                """
                Get the KQL string representation of a value for a given type.
    
                Args:
                    val: The value.
                    typ: The KQL type.
                Returns:
                    str: KQL representation.
                """
                if typ == "string":
                    return f'"{str(val).replace("\"", "\\\"")}"'
                if typ == "datetime":
                    # If already formatted as datetime(<val>), pass through
                    if isinstance(val, str) and val.startswith("datetime("):
                        return val
                    return f"datetime({val})"
                if typ == "bool":
                    return "true" if val else "false"
                if typ in ("long", "real"):
                    return str(val)
                if typ == "dynamic":
                    return f"dynamic({json.dumps(val)})"
                return f'"{str(val)}"'
    
            row_strs = []
            for row in mock_data:
                row_vals = []
                for k, t in columns:
                    v = row.get(k)
                    if (
                        t == "datetime"
                        and isinstance(v, str)
                        and not v.startswith("datetime(")
                    ):
                        row_vals.append(f"datetime({v})")
                    else:
                        row_vals.append(kql_repr(v, t))
                row_strs.append(", ".join(row_vals))
            datatable_var = f"{table_name}Dummy"
            datatable_def = (
                f"let {datatable_var} = datatable(\n    {col_defs}\n) [\n    "
                + ",\n    ".join(row_strs)
                + f"\n];\nlet {table_name} = {datatable_var};"
            )
            return datatable_def, datatable_var
  • The register_tools function that registers the SentinelLogsSearchWithDummyDataTool with the MCP server instance.
    def register_tools(mcp: FastMCP):
        """
        Register Azure Monitor query tools with the MCP server.
    
        Args:
            mcp (FastMCP): The MCP server.
        """
        SentinelLogsSearchTool.register(mcp)
        SentinelLogsSearchWithDummyDataTool.register(mcp)
  • Private helper method within the tool class that generates the KQL 'datatable' and 'let' statements from provided mock data, inferring types and formatting values correctly for KQL.
    def _generate_datatable_and_let(self, mock_data: List[Dict], table_name: str):
        """
        Generate a KQL datatable definition and let binding for mock data.
    
        Args:
            mock_data (list): List of dictionaries representing mock table rows.
            table_name (str): Name for the resulting table.
    
        Returns:
            tuple: (datatable_def, datatable_var) where datatable_def
            is the KQL definition string and datatable_var is the variable name.
        """
        if not isinstance(mock_data, list) or not mock_data:
            raise ValueError("mock_data must be a non-empty list of records")
        first = mock_data[0]
    
        def kql_type(val):
            """
            Infer the KQL type for a Python value.
    
            Args:
                val: The value to infer type for.
            Returns:
                str: KQL type string.
            """
            if isinstance(val, bool):
                return "bool"
            if isinstance(val, int):
                return "long"
            if isinstance(val, float):
                return "real"
            if isinstance(val, dict):
                return "dynamic"
            if isinstance(val, list):
                return "dynamic"
            if isinstance(val, str):
                # Try to detect ISO8601 datetime
                dt_pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}"
                if re.match(dt_pattern, val):
                    return "datetime"
                return "string"
            return "string"
    
        keys = list(first.keys())
        col_types = {}
        dt_pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}"
        for k in keys:
            is_datetime = False
            for row in mock_data:
                v = row.get(k)
                if isinstance(v, str) and re.match(dt_pattern, v):
                    is_datetime = True
                    break
            if is_datetime:
                col_types[k] = "datetime"
            else:
                col_types[k] = kql_type(first[k])
    
        columns = [(k, col_types[k]) for k in keys]
        col_defs = ",\n    ".join([f"{k}:{t}" for k, t in columns])
    
        def kql_repr(val, typ):
            """
            Get the KQL string representation of a value for a given type.
    
            Args:
                val: The value.
                typ: The KQL type.
            Returns:
                str: KQL representation.
            """
            if typ == "string":
                return f'"{str(val).replace("\"", "\\\"")}"'
            if typ == "datetime":
                # If already formatted as datetime(<val>), pass through
                if isinstance(val, str) and val.startswith("datetime("):
                    return val
                return f"datetime({val})"
            if typ == "bool":
                return "true" if val else "false"
            if typ in ("long", "real"):
                return str(val)
            if typ == "dynamic":
                return f"dynamic({json.dumps(val)})"
            return f'"{str(val)}"'
    
        row_strs = []
        for row in mock_data:
            row_vals = []
            for k, t in columns:
                v = row.get(k)
                if (
                    t == "datetime"
                    and isinstance(v, str)
                    and not v.startswith("datetime(")
                ):
                    row_vals.append(f"datetime({v})")
                else:
                    row_vals.append(kql_repr(v, t))
            row_strs.append(", ".join(row_vals))
        datatable_var = f"{table_name}Dummy"
        datatable_def = (
            f"let {datatable_var} = datatable(\n    {col_defs}\n) [\n    "
            + ",\n    ".join(row_strs)
            + f"\n];\nlet {table_name} = {datatable_var};"
        )
        return datatable_def, datatable_var
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden of behavioral disclosure. It states the tool tests queries with mock data and validates KQL locally, which implies a read-only, non-destructive operation, but doesn't clarify permissions, rate limits, or what 'locally' entails (e.g., no network calls). For a tool with zero annotation coverage, this leaves significant gaps in understanding its behavior and constraints.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is highly concise and front-loaded, consisting of a single sentence that efficiently conveys the core functionality: 'Test a KQL query with mock data using a datatable. Validates KQL locally first.' Every word earns its place, with no wasted text or redundancy, making it easy to parse quickly.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the complexity of a query-testing tool with 1 parameter, 0% schema coverage, no annotations, and no output schema, the description is incomplete. It lacks details on parameter usage, behavioral traits (e.g., error handling, mock data scope), and output format. While concise, it doesn't provide enough context for an agent to use the tool effectively without additional assumptions.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters2/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The input schema has 1 parameter ('kwargs') with 0% description coverage, and the tool description provides no information about parameters. It doesn't explain what 'kwargs' should contain (e.g., the KQL query string, datatable configuration, or validation options). With low schema coverage, the description fails to compensate, leaving the parameter's meaning and usage unclear.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose: 'Test a KQL query with mock data using a datatable. Validates KQL locally first.' It specifies the verb ('test'), resource ('KQL query'), and method ('with mock data using a datatable'), making the purpose unambiguous. However, it doesn't explicitly differentiate from its sibling 'sentinel_logs_search' or other query-related tools, which prevents a perfect score.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides no guidance on when to use this tool versus alternatives. It mentions 'validates KQL locally first,' which implies a testing or validation context, but doesn't specify when to choose this over 'sentinel_logs_search' (for real data) or 'sentinel_query_validate' (for validation without mock data). Without explicit usage context or exclusions, the agent lacks clear direction.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dstreefkerk/ms-sentinel-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server