Skip to main content
Glama
dstreefkerk

ms-sentinel-mcp-server

by dstreefkerk

sentinel_logs_table_details_get

Retrieve metadata, retention settings, and row count information for Log Analytics tables in Microsoft Sentinel.

Instructions

Get details (metadata, retention, row count, etc.) for a Log Analytics table

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
kwargsYes

Implementation Reference

  • The `async def run` method in `GetTableDetailsTool` class executes the tool logic. It retrieves table details including retention policies, metadata from Azure Management REST API, and row count/lastUpdated from KQL queries on the Log Analytics workspace.
    async def run(self, ctx: Context, **kwargs):
        """
        Get details (metadata, retention, row count, etc.) for a Log Analytics table.
    
        Args:
            ctx (Context): The MCP tool context.
            **kwargs: Must include 'table_name'.
    
        Returns:
            dict: Results as described in the class docstring.
        """
        table_name = self._extract_param(kwargs, "table_name")
        if not table_name:
            return {"error": "Missing required parameter: table_name"}
        logs_client, workspace_id = self.get_logs_client_and_workspace(ctx)
        cache_key = f"table_details_json:{workspace_id}:{table_name}"
        cached = cache.get(cache_key)
        if cached:
            return cached
        # Get Azure context
        resource_group = None
        workspace_name = None
        subscription_id = None
        if (
            hasattr(ctx, "request_context")
            and getattr(ctx, "request_context", None) is not None
        ):
            services_ctx = ctx.request_context.lifespan_context
            logs_client = getattr(services_ctx, "logs_client", None)
            workspace_id = getattr(services_ctx, "workspace_id", None)
            # We no longer need rest_client as we're using direct API calls
            resource_group = getattr(services_ctx, "resource_group", None)
            workspace_name = getattr(services_ctx, "workspace_name", None)
            subscription_id = getattr(services_ctx, "subscription_id", None)
        errors = []
        result = {"table": table_name}
        # --- REST API METADATA ---
        try:
            if resource_group and workspace_name and subscription_id:
                # We'll use the call_api method directly, no need to get the client separately
    
                # Construct the URL with API version 2017-04-26-preview as recommended
                # pylint: disable=line-too-long
                # ruff: noqa: E501
                url = (
                    f"https://management.azure.com/subscriptions/{subscription_id}/"
                    f"resourceGroups/{resource_group}/providers/Microsoft.OperationalInsights/"
                    f"workspaces/{workspace_name}/tables/{table_name}?api-version=2017-04-26-preview"
                )
    
                # Make the direct REST API call using the task manager
                try:
                    # Use the call_api method from the base class
                    table_data = await self.call_api(
                        ctx, "GET", url, name=f"get_table_details_{table_name}"
                    )
    
                    # Process the response
                    if table_data and "properties" in table_data:
                        props = table_data["properties"]
    
                        # Extract metadata properties (camelCase format from API)
                        result["retentionInDays"] = props.get("retentionInDays")
                        result["totalRetentionInDays"] = props.get(
                            "totalRetentionInDays"
                        )
                        if (
                            props.get("totalRetentionInDays") is not None
                            and props.get("retentionInDays") is not None
                        ):
                            result["archiveRetentionInDays"] = (
                                props["totalRetentionInDays"] - props["retentionInDays"]
                            )
                        else:
                            result["archiveRetentionInDays"] = None
    
                        # Extract other metadata fields
                        result["plan"] = props.get("plan")
                        result["provisioningState"] = props.get("provisioningState")
    
                        # Extract schema-related properties if available
                        if "schema" in props:
                            schema = props["schema"]
                            result["tableType"] = schema.get("tableType")
                            result["description"] = schema.get("description")
                        else:
                            result["tableType"] = props.get("tableType")
                            result["description"] = props.get("description")
    
                        # Extract other properties
                        result["isInherited"] = props.get("isInherited")
                        result["isTotalRetentionInherited"] = props.get(
                            "isTotalRetentionInherited"
                        )
                        self.logger.info(
                            "Successfully retrieved metadata for table: %s", table_name
                        )
                    else:
                        errors.append(
                            "REST API: No properties found in table metadata response."
                        )
                        self.logger.error(
                            "No properties found in table metadata response for: %s",
                            table_name,
                        )
    
                except StopIteration:
                    # Handle case where no data is returned
                    errors.append("REST API: No data returned for table metadata.")
                    self.logger.error(
                        "No data returned for table metadata: %s", table_name
                    )
                except Exception as e:
                    errors.append("REST API call error: %s" % str(e))
                    self.logger.error(
                        "Error during REST API call for table %s: %s", table_name, e
                    )
            else:
                errors.append(
                    "REST API: Missing required parameters for table metadata retrieval."
                )
                self.logger.error(
                    "Missing required parameters: resource_group=%s, workspace_name=%s, subscription_id=%s",
                    resource_group,
                    workspace_name,
                    subscription_id,
                )
        except Exception as e:
            errors.append("REST API client error: %s" % str(e))
        # --- KQL METADATA ---
        if logs_client:
            # Query for lastUpdated
            try:
                kql_last_updated = (
                    f"{table_name}\n| summarize lastUpdated=max(TimeGenerated)"
                )
                last_updated_resp = await run_in_thread(
                    logs_client.query_workspace,
                    workspace_id=workspace_id,
                    query=kql_last_updated,
                    timespan=timedelta(days=30),
                    name="get_table_last_updated",
                )
                if (
                    last_updated_resp
                    and last_updated_resp.tables
                    and len(last_updated_resp.tables[0].rows) > 0
                ):
                    row = last_updated_resp.tables[0].rows[0]
                    result["lastUpdated"] = row[0]
                else:
                    result["lastUpdated"] = None
            except TimeoutError:
                errors.append(
                    "KQL timeout: lastUpdated query exceeded time limit (30 days)"
                )
                result["lastUpdated"] = None
            except Exception as e:
                errors.append(f"KQL error (lastUpdated): {str(e)}")
                result["lastUpdated"] = None
            # Query for rowCount
            try:
                kql_row_count = f"{table_name}\n| count"
                row_count_resp = await run_in_thread(
                    logs_client.query_workspace,
                    workspace_id=workspace_id,
                    query=kql_row_count,
                    timespan=timedelta(days=30),
                    name="get_table_row_count",
                )
                if (
                    row_count_resp
                    and row_count_resp.tables
                    and len(row_count_resp.tables[0].rows) > 0
                ):
                    row = row_count_resp.tables[0].rows[0]
                    result["rowCount"] = row[0]
                else:
                    result["rowCount"] = 0
            except TimeoutError:
                errors.append(
                    "KQL timeout: rowCount query exceeded time limit (30 days)"
                )
                result["rowCount"] = 0
            except Exception as e:
                errors.append(f"KQL error (rowCount): {str(e)}")
                result["rowCount"] = 0
        else:
            errors.append("logs_client missing.")
        if errors:
            result["errors"] = errors
        return result
  • The `register_tools` function in table_tools.py calls `GetTableDetailsTool.register(mcp)` to register the tool with the MCP instance.
    def register_tools(mcp):
        """
        Register all table tools with the given MCP instance.
    
        Args:
            mcp: The MCP instance to register tools with.
        """
        ListTablesTool.register(mcp)
        GetTableSchemaTool.register(mcp)
        GetTableDetailsTool.register(mcp)
  • server.py:236-238 (registration)
    The server.py loads Python modules from the tools/ directory via `load_components` and invokes their `register_tools(mcp)` functions, thereby registering all tools including `sentinel_logs_table_details_get`.
    if os.path.exists(tools_dir):
        tools = load_components(mcp, tools_dir, "register_tools")
        logger.info("Auto-registered %d tool modules", len(tools))

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dstreefkerk/ms-sentinel-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server