sentinel_logs_tables_list
List available tables in Microsoft Sentinel Log Analytics workspaces to identify data sources for querying and analysis.
Instructions
List available tables in the Log Analytics workspace
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| kwargs | Yes |
Implementation Reference
- tools/table_tools.py:15-166 (handler)ListTablesTool class defines the 'sentinel_logs_tables_list' tool. Includes name, description, and the full async run() method that implements the core logic: extracts parameters, gets logs client, checks cache, executes KQL queries to list tables (with optional stats and filtering), processes results, handles errors, and caches output.class ListTablesTool(MCPToolBase): """ Tool to list available tables in the Log Analytics workspace. Parameters: filter_pattern (str, optional): Pattern to filter table names include_stats (bool, optional): Include row counts and last updated times (default: False) WARNING: Setting to True can be slow in large environments Returns: dict: { 'found': int, # Number of tables found 'tables': list, # List of table metadata dicts # If include_stats=False: [{'name': str}] # If include_stats=True: [{'name': str, 'lastUpdated': str, 'rowCount': int}] 'error': str (optional) # Error message if applicable } """ name = "sentinel_logs_tables_list" description = "List available tables in the Log Analytics workspace" async def run(self, ctx: Context, **kwargs): """ List available tables in the Log Analytics workspace. Args: ctx (Context): The MCP tool context. **kwargs: Optional filter_pattern to filter table names. Optional include_stats (bool) to include row counts and last updated times. Returns: dict: Results as described in the class docstring. """ filter_pattern = self._extract_param(kwargs, "filter_pattern", "") include_stats = self._extract_param(kwargs, "include_stats", False) logs_client, workspace_id = self.get_logs_client_and_workspace(ctx) cache_key = f"tables_json:{workspace_id}:{filter_pattern}:{include_stats}" cached = cache.get(cache_key) if cached: return cached if logs_client is None: result = { "error": ( "Azure Logs client is not initialized. " "Check your credentials and configuration." ) } cache.set(cache_key, result) return result try: # Simple query for table names only (fast) if not include_stats: kql_table_names = ( "search *\n" "| distinct $table\n" "| project TableName = $table\n" "| order by TableName asc" ) if filter_pattern: kql_table_names = ( "search *\n" "| distinct $table\n" "| project TableName = $table\n" f'| where TableName contains "{filter_pattern}"\n' "| order by TableName asc" ) query = kql_table_names timespan = timedelta(days=1) # Minimal timespan for fast query else: # Full query with stats (expensive) kql_table_info = ( "search *\n" "| distinct $table\n" "| extend TableName = $table\n" "| project-away $table\n" "| join kind=leftouter (\n" " union withsource=TableSource *\n" " | summarize LastUpdate=max(TimeGenerated),\n" " RowCount=count() by TableSource\n" " | project TableSource, LastUpdate, RowCount\n" ") on $left.TableName == $right.TableSource\n" "| project name=TableName, lastUpdated=LastUpdate, " "rowCount=RowCount\n" "| order by name asc" ) if filter_pattern: kql_table_info = ( "search *\n" "| distinct $table\n" "| extend TableName = $table\n" "| project-away $table\n" "| join kind=leftouter (\n" " union withsource=TableSource *\n" " | summarize LastUpdate=max(TimeGenerated),\n" " RowCount=count() by TableSource\n" " | project TableSource, LastUpdate, RowCount\n" ") on $left.TableName == $right.TableSource\n" "| project name=TableName, lastUpdated=LastUpdate, " "rowCount=RowCount\n" f'| where name contains "{filter_pattern}"\n' "| order by name asc" ) query = kql_table_info timespan = timedelta(days=30) # Reduced from 90 days for better performance response = await run_in_thread( logs_client.query_workspace, workspace_id=workspace_id, query=query, timespan=timespan, name="list_tables_info" if include_stats else "list_table_names", ) if response and response.tables and len(response.tables[0].rows) > 0: tables = [] for row in response.tables[0].rows: if not include_stats: # Simple mode: only table names table = {"name": row[0]} else: # Full mode: include stats table = {"name": row[0], "lastUpdated": row[1], "rowCount": row[2]} tables.append(table) result = {"found": len(tables), "tables": tables} cache.set(cache_key, result) return result result = { "found": 0, "tables": [], "error": ( "No tables found. The workspace may be empty " "or you may not have access to the data." ), } cache.set(cache_key, result) return result except TimeoutError: error_msg = ( "Query timed out. The workspace may have too many tables or too much data. " "Try using include_stats=False for faster results, or use a filter_pattern to reduce the scope." ) result = {"error": error_msg} self.logger.error("Query timeout in list tables: %s", error_msg) cache.set(cache_key, result) return result except Exception as e: result = {"error": "Failed to list tables: %s" % str(e)} self.logger.error("Failed to list tables: %s", str(e)) cache.set(cache_key, result) return result
- tools/table_tools.py:16-32 (schema)Class docstring provides the tool schema: input parameters filter_pattern (str optional), include_stats (bool optional); output dict with 'found', 'tables' list (with optional stats), and optional 'error'.""" Tool to list available tables in the Log Analytics workspace. Parameters: filter_pattern (str, optional): Pattern to filter table names include_stats (bool, optional): Include row counts and last updated times (default: False) WARNING: Setting to True can be slow in large environments Returns: dict: { 'found': int, # Number of tables found 'tables': list, # List of table metadata dicts # If include_stats=False: [{'name': str}] # If include_stats=True: [{'name': str, 'lastUpdated': str, 'rowCount': int}] 'error': str (optional) # Error message if applicable } """
- tools/table_tools.py:485-495 (registration)register_tools(mcp) function registers the ListTablesTool (along with other table tools) by calling ListTablesTool.register(mcp). This is loaded dynamically by server.py.def register_tools(mcp): """ Register all table tools with the given MCP instance. Args: mcp: The MCP instance to register tools with. """ ListTablesTool.register(mcp) GetTableSchemaTool.register(mcp) GetTableDetailsTool.register(mcp)