Skip to main content
Glama
dstreefkerk

ms-sentinel-mcp-server

by dstreefkerk

sentinel_logs_table_schema_get

Retrieve the schema including column names and data types for a specified Log Analytics table in Microsoft Sentinel.

Instructions

Get schema (columns/types) for a Log Analytics table

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
kwargsYes

Implementation Reference

  • The async run method of GetTableSchemaTool class, which implements the core logic: extracts table_name, queries using KQL '| getschema', parses columns/types, caches and returns schema.
    async def run(self, ctx: Context, **kwargs):
        """
        Get the schema (columns/types) for a Log Analytics table.
    
        Args:
            ctx (Context): The MCP tool context.
            **kwargs: Must include 'table_name'.
    
        Returns:
            dict: Results as described in the class docstring.
        """
        table_name = self._extract_param(kwargs, "table_name")
        if not table_name:
            return {"error": "Missing required parameter: table_name"}
        logs_client, workspace_id = self.get_logs_client_and_workspace(ctx)
        cache_key = f"table_schema_json:{workspace_id}:{table_name}"
        cached = cache.get(cache_key)
        if cached:
            return cached
        if logs_client is None:
            result = {
                "error": (
                    "Azure Logs client is not initialized. "
                    "Check your credentials and configuration."
                )
            }
            cache.set(cache_key, result)
            return result
        try:
            kql_schema = f"{table_name} | getschema"
            response = await run_in_thread(
                logs_client.query_workspace,
                workspace_id=workspace_id,
                query=kql_schema,
                timespan=timedelta(days=1),
                name="get_table_schema",
            )
            schema = []
            if response and response.tables and len(response.tables[0].rows) > 0:
                columns = response.tables[0].columns
                rows = response.tables[0].rows
                # Try to find the canonical getschema columns
                col_name_idx = col_type_idx = col_data_type_idx = col_ordinal_idx = None
    
                # Determine if columns are strings or objects
                def col_name(col):
                    """Return column name as lowercase string."""
                    return col.lower() if isinstance(col, str) else col.name.lower()
    
                for idx, col in enumerate(columns):
                    cname = col_name(col)
                    if cname == "columnname":
                        col_name_idx = idx
                    elif cname == "columntype":
                        col_type_idx = idx
                    elif cname == "datatype":
                        col_data_type_idx = idx
                    elif cname == "columnordinal":
                        col_ordinal_idx = idx
                if col_name_idx is not None and col_type_idx is not None:
                    # Return all metadata if available
                    for row in rows:
                        entry = {"name": row[col_name_idx], "type": row[col_type_idx]}
                        if col_data_type_idx is not None:
                            entry["dataType"] = row[col_data_type_idx]
                        if col_ordinal_idx is not None:
                            entry["ordinal"] = row[col_ordinal_idx]
                        schema.append(entry)
                else:
                    # Fallback: return all columns for each row
                    for row in rows:
                        schema.append(
                            {col.name: row[i] for i, col in enumerate(columns)}
                        )
                result = {"table": table_name, "schema": schema}
                cache.set(cache_key, result)
                return result
            result = {
                "table": table_name,
                "schema": [],
                "error": f"No schema found for table {table_name}.",
            }
            cache.set(cache_key, result)
            return result
        except Exception as e:
            result = {"error": "Failed to get table schema: %s" % str(e)}
            self.logger.error("Failed to get table schema: %s", str(e))
            cache.set(cache_key, result)
            return result
  • The GetTableSchemaTool class defining the tool handler, including name, description, and run method for sentinel_logs_table_schema_get.
    class GetTableSchemaTool(MCPToolBase):
        """
        Tool to get schema (columns/types) for a Log Analytics table.
    
        Returns:
            dict: {
                'table': str,            # Table name
                'schema': list,          # List of schema column metadata
                'error': str (optional)  # Error message if applicable
            }
        """
    
        name = "sentinel_logs_table_schema_get"
        description = "Get schema (columns/types) for a Log Analytics table"
    
        async def run(self, ctx: Context, **kwargs):
            """
            Get the schema (columns/types) for a Log Analytics table.
    
            Args:
                ctx (Context): The MCP tool context.
                **kwargs: Must include 'table_name'.
    
            Returns:
                dict: Results as described in the class docstring.
            """
            table_name = self._extract_param(kwargs, "table_name")
            if not table_name:
                return {"error": "Missing required parameter: table_name"}
            logs_client, workspace_id = self.get_logs_client_and_workspace(ctx)
            cache_key = f"table_schema_json:{workspace_id}:{table_name}"
            cached = cache.get(cache_key)
            if cached:
                return cached
            if logs_client is None:
                result = {
                    "error": (
                        "Azure Logs client is not initialized. "
                        "Check your credentials and configuration."
                    )
                }
                cache.set(cache_key, result)
                return result
            try:
                kql_schema = f"{table_name} | getschema"
                response = await run_in_thread(
                    logs_client.query_workspace,
                    workspace_id=workspace_id,
                    query=kql_schema,
                    timespan=timedelta(days=1),
                    name="get_table_schema",
                )
                schema = []
                if response and response.tables and len(response.tables[0].rows) > 0:
                    columns = response.tables[0].columns
                    rows = response.tables[0].rows
                    # Try to find the canonical getschema columns
                    col_name_idx = col_type_idx = col_data_type_idx = col_ordinal_idx = None
    
                    # Determine if columns are strings or objects
                    def col_name(col):
                        """Return column name as lowercase string."""
                        return col.lower() if isinstance(col, str) else col.name.lower()
    
                    for idx, col in enumerate(columns):
                        cname = col_name(col)
                        if cname == "columnname":
                            col_name_idx = idx
                        elif cname == "columntype":
                            col_type_idx = idx
                        elif cname == "datatype":
                            col_data_type_idx = idx
                        elif cname == "columnordinal":
                            col_ordinal_idx = idx
                    if col_name_idx is not None and col_type_idx is not None:
                        # Return all metadata if available
                        for row in rows:
                            entry = {"name": row[col_name_idx], "type": row[col_type_idx]}
                            if col_data_type_idx is not None:
                                entry["dataType"] = row[col_data_type_idx]
                            if col_ordinal_idx is not None:
                                entry["ordinal"] = row[col_ordinal_idx]
                            schema.append(entry)
                    else:
                        # Fallback: return all columns for each row
                        for row in rows:
                            schema.append(
                                {col.name: row[i] for i, col in enumerate(columns)}
                            )
                    result = {"table": table_name, "schema": schema}
                    cache.set(cache_key, result)
                    return result
                result = {
                    "table": table_name,
                    "schema": [],
                    "error": f"No schema found for table {table_name}.",
                }
                cache.set(cache_key, result)
                return result
            except Exception as e:
                result = {"error": "Failed to get table schema: %s" % str(e)}
                self.logger.error("Failed to get table schema: %s", str(e))
                cache.set(cache_key, result)
                return result
  • The register_tools function that registers GetTableSchemaTool (sentinel_logs_table_schema_get) with the MCP instance.
    def register_tools(mcp):
        """
        Register all table tools with the given MCP instance.
    
        Args:
            mcp: The MCP instance to register tools with.
        """
        ListTablesTool.register(mcp)
        GetTableSchemaTool.register(mcp)
        GetTableDetailsTool.register(mcp)
  • Docstring in GetTableSchemaTool describing the tool's input (table_name) and output schema/format.
    """
    Tool to get schema (columns/types) for a Log Analytics table.
    
    Returns:
        dict: {
            'table': str,            # Table name
            'schema': list,          # List of schema column metadata
            'error': str (optional)  # Error message if applicable
        }
    """

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dstreefkerk/ms-sentinel-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server