Skip to main content
Glama
dstreefkerk

ms-sentinel-mcp-server

by dstreefkerk

log_analytics_saved_searches_list

Retrieve all saved search queries from a Log Analytics workspace to monitor and analyze security data efficiently.

Instructions

List all saved searches in a Log Analytics workspace

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
kwargsYes

Implementation Reference

  • The async run method that executes the tool logic: validates context, initializes Log Analytics client, lists saved searches, processes them into a structured list, and returns the result.
    async def run(self, ctx: Context, **kwargs):
        """
        List all saved searches in the specified Log Analytics workspace.
    
        Args:
            ctx (Context): The FastMCP context containing authentication and request information.
            **kwargs: Additional keyword arguments (unused).
    
        Returns:
            dict: Dictionary containing the list of saved searches, count, and validity flag.
        """
        # Get Azure context
        workspace_name, resource_group, subscription_id = self.get_azure_context(ctx)
    
        # Validate Azure context
        sdk_available = True
        try:
            # Just check if the module is available
            import importlib.util  # pylint: disable=import-outside-toplevel
    
            sdk_available = (
                importlib.util.find_spec("azure.mgmt.loganalytics") is not None
            )
        except ImportError:
            sdk_available = False
    
        if not self.validate_azure_context(
            sdk_available, workspace_name, resource_group, subscription_id, self.logger
        ):
            return {"error": "Missing Azure SDK or workspace details."}
    
        # Get Log Analytics client
        client = None
        try:
            client = self.get_loganalytics_client(subscription_id)
        except Exception as e:
            self.logger.error("Error initializing Azure LogAnalytics client: %s", e)
            return {"error": "Azure LogAnalytics client initialization failed: %s" % e}
    
        if client is None:
            return {"error": "Azure LogAnalytics client is not initialized"}
    
        try:
            # List all saved searches in the workspace
            saved_searches_result = await run_in_thread(
                client.saved_searches.list_by_workspace,
                resource_group_name=resource_group,
                workspace_name=workspace_name,
            )
    
            # Log the result to understand its structure
            self.logger.info(
                "Saved searches result type: %s", type(saved_searches_result)
            )
    
            result = []
            # Check if the result has a 'value' attribute which is the actual list
            if hasattr(saved_searches_result, "value"):
                saved_searches = saved_searches_result.value
            else:
                # If not, try to access it as a dictionary
                saved_searches = getattr(saved_searches_result, "saved_searches", [])
    
            self.logger.info(
                "Processing %s saved searches",
                len(saved_searches) if saved_searches else 0,
            )
    
            for search in saved_searches:
                # Create a basic info dictionary with guaranteed attributes
                search_info = {
                    "id": search.id if hasattr(search, "id") else None,
                    "name": search.name if hasattr(search, "name") else None,
                    "type": search.type if hasattr(search, "type") else None,
                }
    
                # Try to access properties directly from the search object first
                try:
                    # Check for direct properties on the search object
                    properties_to_check = [
                        "category",
                        "display_name",
                        "query",
                        "function_alias",
                        "function_parameters",
                        "version",
                        "tags",
                        "etag",
                        "time_created",
                        "time_modified",
                    ]
    
                    for prop_name in properties_to_check:
                        if hasattr(search, prop_name):
                            value = getattr(search, prop_name)
                            if value is not None:
                                # Convert snake_case to camelCase for consistency in the output
                                key = "".join(
                                    [
                                        x.capitalize() if i > 0 else x
                                        for i, x in enumerate(prop_name.split("_"))
                                    ]
                                )
                                search_info[key] = value
    
                    # If we couldn't find any direct properties, try the nested properties approach
                    if len(search_info) <= 3 and hasattr(search, "properties"):
                        props = search.properties
                        if hasattr(props, "category"):
                            search_info["category"] = props.category
                        if hasattr(props, "display_name"):
                            search_info["displayName"] = props.display_name
                        if hasattr(props, "query"):
                            search_info["query"] = props.query
                        if hasattr(props, "function_alias"):
                            search_info["functionAlias"] = props.function_alias
                        if hasattr(props, "version"):
                            search_info["version"] = props.version
                        if hasattr(props, "tags"):
                            search_info["tags"] = props.tags
                        if hasattr(props, "etag"):
                            search_info["etag"] = props.etag
                except Exception as prop_error:
                    # Log the property access error but continue with basic details
                    self.logger.error(
                        "Error accessing saved search properties: %s", prop_error
                    )
    
                result.append(search_info)
    
            return {"savedSearches": result, "count": len(result), "valid": True}
        except Exception as e:
            self.logger.error("Error retrieving saved searches: %s", e)
            return {"error": "Error retrieving saved searches: %s" % str(e)}
  • Registers the tool with the MCP server instance.
    LogAnalyticsSavedSearchesListTool.register(mcp)
  • Tool metadata: name and description used for schema/registration.
    name = "log_analytics_saved_searches_list"
    description = "List all saved searches in a Log Analytics workspace"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dstreefkerk/ms-sentinel-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server