Skip to main content
Glama
dstreefkerk

ms-sentinel-mcp-server

by dstreefkerk

sentinel_logs_search

Search and analyze Azure Sentinel security logs using Kusto Query Language (KQL) queries to investigate security incidents and monitor threats.

Instructions

Run a KQL query against Azure Monitor

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
kwargsYes

Implementation Reference

  • The async run method that executes the KQL query against Azure Monitor Logs using the logs client, handles parameters like query and timespan, processes results into JSON-safe format, and returns structured results or errors.
    async def run(self, ctx: Context, **kwargs):
        """
        Run a KQL query against Azure Monitor Logs.
    
        Args:
            ctx (Context): The MCP context.
            **kwargs: Should include 'query' and optional 'timespan'.
    
        Returns:
            dict: Query results and metadata, or error information.
        """
        # Extract parameters using the centralized parameter extraction from MCPToolBase
        query = self._extract_param(kwargs, "query")
        timespan = self._extract_param(kwargs, "timespan", "1d")
        logger = self.logger
        if not query:
            logger.error("Missing required parameter: query")
            return {
                "valid": False,
                "errors": ["Missing required parameter: query"],
                "error": "Missing required parameter: query",
                "result_count": 0,
                "columns": [],
                "rows": [],
                "warnings": ["Missing required parameter: query"],
                "message": "Missing required parameter: query",
            }
    
        logs_client, workspace_id = self.get_logs_client_and_workspace(ctx)
        if logs_client is None or workspace_id is None:
            logger.error(
                "Azure Monitor Logs client or workspace_id is not initialized."
            )
            return {
                "valid": False,
                "errors": [
                    (
                        "Azure Monitor Logs client or workspace_id is not initialized. "
                        # noqa: E501
                        "Check your credentials and configuration."
                    )
                ],
                "error": (
                    "Azure Monitor Logs client or workspace_id is not initialized. "
                    # noqa: E501
                    "Check your credentials and configuration."
                ),
                "result_count": 0,
                "columns": [],
                "rows": [],
                "warnings": [
                    "Azure Monitor Logs client or workspace_id is not initialized."
                ],
                "message": "Azure Monitor Logs client or workspace_id is not initialized.",
            }
    
        start_time = time.perf_counter()
        timespan_obj = None
        try:
            if timespan:
                if timespan.endswith("d"):
                    timespan_obj = timedelta(days=int(timespan[:-1]))
                elif timespan.endswith("h"):
                    timespan_obj = timedelta(hours=int(timespan[:-1]))
                elif timespan.endswith("m"):
                    timespan_obj = timedelta(minutes=int(timespan[:-1]))
                else:
                    timespan_obj = timedelta(days=1)  # noqa: E501
        except Exception as e:
            logger.error("Invalid timespan format: %s", e)
            return {
                "error": (
                    "Invalid timespan format: %s. Use format like '1d', '12h', or '30m'."
                    % str(e)
                ),
                "result_count": 0,
                "columns": [],
                "rows": [],
                "warnings": ["Invalid timespan format: %s" % str(e)],
                "message": "Invalid timespan format: %s" % str(e),
            }
    
        warnings = []
        match = re.search(r"\b(take|limit)\s+(\d+)", query, re.IGNORECASE)
        if match:
            n = int(match.group(2))
            if n > 250:
                warnings.append(
                    f"Large result set requested ({n} rows). "
                    "Consider using a smaller limit for better performance."
                )  # noqa: E501
    
        try:
            # Execute the query using task manager for async compatibility
            response = await run_in_thread(
                logs_client.query_workspace,
                workspace_id=workspace_id,
                query=query,
                timespan=timespan_obj,
                name=f"query_logs_{hash(query) % 10000}",
            )
            exec_time_ms = int((time.perf_counter() - start_time) * 1000)
    
            def make_json_safe(val):
                if isinstance(val, datetime):
                    return val.isoformat()
                if isinstance(val, date):
                    return val.isoformat()
                return val
    
            def get_col_info(col, idx):
                # Azure SDK columns may have name/type/ordinal attributes,
                # or just be strings
                return {
                    "name": getattr(col, "name", col),
                    "type": getattr(col, "type", getattr(col, "column_type", "string")),
                    "ordinal": getattr(col, "ordinal", idx),
                }
    
            if response and getattr(response, "tables", None):
                table = response.tables[0]
                columns = [
                    get_col_info(col, idx) for idx, col in enumerate(table.columns)
                ]
                rows = [list(row) for row in table.rows]
                dict_rows = [
                    {
                        col["name"]: make_json_safe(cell)
                        for col, cell in zip(columns, row)
                    }
                    for row in rows
                ]
                result_obj = {
                    "valid": True,
                    "errors": [],
                    "query": query,
                    "timespan": timespan,
                    "result_count": len(dict_rows),
                    "columns": columns,
                    "rows": dict_rows,
                    "execution_time_ms": exec_time_ms,
                    "warnings": warnings,
                    "message": "Query executed successfully",
                }
                return result_obj
            else:
                result_obj = {
                    "valid": True,
                    "errors": [],
                    "query": query,
                    "timespan": timespan,
                    "result_count": 0,
                    "columns": [],
                    "rows": [],
                    "execution_time_ms": int((time.perf_counter() - start_time) * 1000),
                    "warnings": warnings,
                    "message": "Query returned no tables or results",
                }
                return result_obj
    
        except TimeoutError:
            logger.error("Query timed out after 60 seconds")
            return {
                "valid": False,
                "errors": ["Query timed out after 60 seconds"],
                "error": "Query timed out after 60 seconds",
                "result_count": 0,
                "columns": [],
                "rows": [],
                "warnings": ["Query timed out after 60 seconds"],
                "message": "Query timed out after 60 seconds",
            }
        except Exception as e:
            logger.error("Error executing logs query: %s", str(e), exc_info=True)
            return {
                "valid": False,
                "errors": [f"Error executing query: {str(e)}"],
                "error": f"Error executing query: {str(e)}",
                "result_count": 0,
                "columns": [],
                "rows": [],
                "warnings": [f"Error executing query: {str(e)}"],
                "message": f"Error executing query: {str(e)}",
            }
  • Class-level attributes defining the tool's name and description for MCP registration and usage.
    name = "sentinel_logs_search"
    description = "Run a KQL query against Azure Monitor"
  • Function that registers the SentinelLogsSearchTool (and dummy variant) with the FastMCP server instance.
    def register_tools(mcp: FastMCP):
        """
        Register Azure Monitor query tools with the MCP server.
    
        Args:
            mcp (FastMCP): The MCP server.
        """
        SentinelLogsSearchTool.register(mcp)
        SentinelLogsSearchWithDummyDataTool.register(mcp)
Behavior1/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden of behavioral disclosure. It fails to describe key traits: whether this is a read-only or mutating operation, authentication requirements, rate limits, error handling, or what the output looks like (e.g., raw logs, structured data). The description only states the action without behavioral context.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, efficient sentence with zero waste—it directly states the tool's action and target. It's appropriately sized for a basic description, though its brevity contributes to gaps in other dimensions.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness1/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the complexity (a query tool with no annotations, 0% schema coverage, no output schema, and many sibling tools), the description is incomplete. It lacks essential details: parameter guidance, behavioral traits, usage context, and output expectations. This makes it inadequate for an agent to use the tool effectively.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters1/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 0%, with one undocumented parameter 'kwargs' of type string. The description adds no meaning beyond the schema—it doesn't explain what 'kwargs' should contain (e.g., KQL query syntax, time range, workspace ID), format, or examples. This leaves the parameter semantics entirely unclear.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose3/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description states the action ('Run a KQL query') and target ('against Azure Monitor'), which gives a basic purpose. However, it's vague about what 'Azure Monitor' specifically refers to in this context (e.g., logs, metrics, specific tables) and doesn't distinguish it from sibling tools like 'sentinel_logs_search_with_dummy_data' or 'log_analytics_saved_searches_list', which limits its clarity.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

No guidance is provided on when to use this tool versus alternatives. It doesn't mention prerequisites (e.g., authentication, workspace context), typical use cases (e.g., security investigations, monitoring), or exclusions (e.g., when to use 'sentinel_logs_search_with_dummy_data' for testing). This leaves the agent without context for selection.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dstreefkerk/ms-sentinel-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server