Skip to main content
Glama
dstreefkerk

ms-sentinel-mcp-server

by dstreefkerk
workspace_tools.py25.1 kB
""" MCP-compliant tools for retrieving and managing Microsoft Sentinel workspace information. This module provides tools for querying Sentinel workspace details, source controls, metadata, and ML analytics settings. All tools are implemented using MCPToolBase and are compatible with both MCP server and direct invocation. """ from mcp.server.fastmcp import Context from tools.base import MCPToolBase from utilities.task_manager import run_in_thread class SentinelWorkspaceGetTool(MCPToolBase): """ Tool for retrieving detailed information about the current Sentinel Log Analytics workspace. """ name = "sentinel_workspace_get" description = "Get workspace information (refactored, MCP-compliant)" async def run(self, ctx: Context, **kwargs): """ Get detailed information about the current Sentinel Log Analytics workspace. Returns: dict: { 'workspace_name': str, 'resource_group': str, 'subscription_id': str, 'properties': dict, # workspace properties or empty if unavailable 'additional_information': list[str], 'error': str (optional, present only if an error occurs) } Output Fields: - workspace_name: The name of the Sentinel Log Analytics workspace. - resource_group: The Azure resource group for the workspace. - subscription_id: The Azure subscription ID. - properties: Detailed properties about the workspace (location, SKU, retention, etc.). - additional_information: Guidance on related tools and next steps. - error: Error message if an error occurs (optional). Error cases will always include an 'error' key for testability. Parameters are extracted from both kwargs and kwargs['kwargs'] for MCP compatibility. Azure Context Fallback: - Supports both MCP server and direct invocation. - If ctx.request_context is not available, falls back to environment variables for Azure context: AZURE_TENANT_ID, AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_SUBSCRIPTION_ID, AZURE_RESOURCE_GROUP, AZURE_WORKSPACE_NAME. """ logger = self.logger # Extract parameters from both kwargs and kwargs['kwargs'] (future-proof, # even if unused) params = dict(kwargs) if "kwargs" in kwargs and isinstance(kwargs["kwargs"], dict): params.update(kwargs["kwargs"]) # Extract context (assume .env is loaded and Context is configured) workspace_name, resource_group, subscription_id = self.get_azure_context(ctx) result = { "workspace_name": workspace_name, "resource_group": resource_group, "subscription_id": subscription_id, "properties": {}, "additional_information": [ "For data connector details, use the `sentinel_connectors_list` tool.", "For analytics rules details, use the `list_analytics_rules` tool.", ], } try: client = self.get_loganalytics_client(subscription_id) ws = await run_in_thread( client.workspaces.get, resource_group, workspace_name ) result["properties"] = { "location": ws.location, "sku": getattr(ws.sku, "name", None), "sku_description": getattr(ws.sku, "description", None), "last_sku_update": str(getattr(ws, "last_sku_update", "")), "retention_period_days": ws.retention_in_days, "daily_quota_gb": getattr(ws, "daily_quota_gb", None), "quota_reset_time": str(getattr(ws, "quota_reset_time", "")), "ingestion_status": getattr(ws, "ingestion_status", None), "public_network_access_ingestion": getattr( ws, "public_network_access_for_ingestion", None ), "public_network_access_query": getattr( ws, "public_network_access_for_query", None ), "created": str(getattr(ws, "created_date", "")), "last_modified": str(getattr(ws, "modified_date", "")), "features": getattr(ws, "features", {}), } except Exception as ex: error_msg = "Error retrieving workspace info: %s" % ex logger.exception("%s", error_msg) result["error"] = error_msg return result class SentinelSourceControlsListTool(MCPToolBase): """ Tool for listing all Sentinel source controls in the current workspace. """ name = "sentinel_source_controls_list" description = "List all Sentinel source controls in the current workspace." async def run(self, ctx: Context, **kwargs): """ List all source controls in the current Sentinel workspace. Parameters: None required. (Context is extracted from MCP or environment.) Returns: dict: { 'source_controls': list[dict], 'valid': bool, 'errors': list[str], 'error': str (optional, present only if an error occurs) } Output Fields: - source_controls: List of source control objects (id, name, repo, etc.) - valid: True if successful, False otherwise - errors: List of error messages (empty if none) - error: Error message if an error occurs (optional) Error cases will always include an 'error' key for testability. """ logger = self.logger params = dict(kwargs) if "kwargs" in kwargs and isinstance(kwargs["kwargs"], dict): params.update(kwargs["kwargs"]) workspace_name, resource_group, subscription_id = self.get_azure_context(ctx) result = { "source_controls": [], "valid": False, "errors": [], } try: client = self.get_securityinsight_client(subscription_id) controls = client.source_controls.list(resource_group, workspace_name) controls_list = [] for ctrl in controls: controls_list.append( { "id": getattr(ctrl, "id", None), "name": getattr(ctrl, "name", None), "repo_type": getattr(ctrl, "repo_type", None), "repo_url": getattr(ctrl, "repo_url", None), "description": getattr(ctrl, "description", None), "content_types": getattr(ctrl, "content_types", None), "created_time_utc": str(getattr(ctrl, "created_time_utc", "")), "last_modified_time_utc": str( getattr(ctrl, "last_modified_time_utc", "") ), } ) result["source_controls"] = controls_list result["valid"] = True except Exception as ex: error_msg = "Error listing source controls: %s" % ex logger.exception("%s", error_msg) result["error"] = error_msg result["errors"].append(error_msg) return result class SentinelSourceControlGetTool(MCPToolBase): """ Tool for retrieving details for a specific Sentinel source control by ID. """ name = "sentinel_source_control_get" description = "Get details for a specific Sentinel source control by ID." async def run(self, ctx: Context, **kwargs): """ Get details for a specific source control by ID. Parameters: source_control_id (str, required): The ID of the source control to retrieve. Returns: dict: { 'source_control': dict, 'valid': bool, 'errors': list[str], 'error': str (optional, present only if an error occurs) } Output Fields: - source_control: Source control object (id, name, repo, etc.) - valid: True if successful, False otherwise - errors: List of error messages (empty if none) - error: Error message if an error occurs (optional) Error cases will always include an 'error' key for testability. """ logger = self.logger # Extract parameters using the base class method source_control_id = self._extract_param(kwargs, "source_control_id") result = { "source_control": {}, "valid": False, "errors": [], } if not source_control_id: error_msg = "Missing required parameter: source_control_id" logger.error("%s", error_msg) result["error"] = error_msg result["errors"].append(error_msg) return result workspace_name, resource_group, subscription_id = self.get_azure_context(ctx) try: client = self.get_securityinsight_client(subscription_id) ctrl = client.source_controls.get( resource_group, workspace_name, source_control_id ) result["source_control"] = { "id": getattr(ctrl, "id", None), "name": getattr(ctrl, "name", None), "repo_type": getattr(ctrl, "repo_type", None), "repo_url": getattr(ctrl, "repo_url", None), "description": getattr(ctrl, "description", None), "content_types": getattr(ctrl, "content_types", None), "created_time_utc": str(getattr(ctrl, "created_time_utc", "")), "last_modified_time_utc": str( getattr(ctrl, "last_modified_time_utc", "") ), } result["valid"] = True except Exception as ex: error_msg = "Error retrieving source control: %s" % ex logger.exception("%s", error_msg) result["error"] = error_msg result["errors"].append(error_msg) return result class SentinelMetadataListTool(MCPToolBase): """ Tool for listing all Sentinel metadata in the current workspace. """ name = "sentinel_metadata_list" description = "List all Sentinel metadata in the current workspace." async def run(self, ctx: Context, **kwargs): """ List all metadata in the current Sentinel workspace. Parameters: None required. (Context is extracted from MCP or environment.) Returns: dict: { 'metadata': list[dict], 'valid': bool, 'errors': list[str], 'error': str (optional, present only if an error occurs) } Output Fields: - metadata: List of metadata objects (id, name, kind, etc.) - valid: True if successful, False otherwise - errors: List of error messages (empty if none) - error: Error message if an error occurs (optional) Error cases will always include an 'error' key for testability. """ logger = self.logger workspace_name, resource_group, subscription_id = self.get_azure_context(ctx) result = { "metadata": [], "valid": False, "errors": [], } try: client = self.get_securityinsight_client(subscription_id) metadata_objs = client.metadata.list(resource_group, workspace_name) metadata_list = [] for meta in metadata_objs: metadata_list.append( { "id": getattr(meta, "id", None), "name": getattr(meta, "name", None), "kind": getattr(meta, "kind", None), "content_id": getattr(meta, "content_id", None), "version": getattr(meta, "version", None), "parent_id": getattr(meta, "parent_id", None), "author": getattr(meta, "author", None), "source": getattr(meta, "source", None), "support": getattr(meta, "support", None), "categories": getattr(meta, "categories", None), "dependencies": getattr(meta, "dependencies", None), "created": str(getattr(meta, "created", "")), "last_modified": str(getattr(meta, "last_modified", "")), } ) result["metadata"] = metadata_list result["valid"] = True except Exception as ex: error_msg = "Error listing metadata: %s" % ex logger.exception("%s", error_msg) result["error"] = error_msg result["errors"].append(error_msg) return result class SentinelMetadataGetTool(MCPToolBase): """ Tool for retrieving details for specific Sentinel metadata by ID. """ name = "sentinel_metadata_get" description = "Get details for specific Sentinel metadata by ID." async def run(self, ctx: Context, **kwargs): """ Get details for specific metadata by ID. Parameters: metadata_id (str, required): The ID of the metadata object to retrieve. Returns: dict: { 'metadata': dict, 'valid': bool, 'errors': list[str], 'error': str (optional, present only if an error occurs) } Output Fields: - metadata: Metadata object (id, name, kind, etc.) - valid: True if successful, False otherwise - errors: List of error messages (empty if none) - error: Error message if an error occurs (optional) Error cases will always include an 'error' key for testability. """ logger = self.logger # Accept both 'metadata_id' and 'id' as input keys using the base class method metadata_id = self._extract_param(kwargs, "metadata_id") or self._extract_param( kwargs, "id" ) logger.debug("SentinelMetadataGetTool metadata_id: %r", metadata_id) # If a full ARM resource ID is provided, extract the short name (last segment) if metadata_id and "/" in metadata_id: metadata_id = metadata_id.rstrip("/").split("/")[-1] result = { "metadata": {}, "valid": False, "errors": [], } if not metadata_id: error_msg = ( "Missing required parameter: metadata_id or id. Provide either " "the short name or the full ARM resource ID." ) logger.error("%s", error_msg) result["error"] = error_msg result["errors"].append(error_msg) return result workspace_name, resource_group, subscription_id = self.get_azure_context(ctx) try: client = self.get_securityinsight_client(subscription_id) meta = client.metadata.get(resource_group, workspace_name, metadata_id) def _serialize_model(obj): if hasattr(obj, "as_dict"): return obj.as_dict() elif hasattr(obj, "__dict__"): # fallback, filter out private attributes return { k: v for k, v in obj.__dict__.items() if not k.startswith("_") } elif obj is None: return None else: return str(obj) result["metadata"] = { "id": getattr(meta, "id", None), "name": getattr(meta, "name", None), "kind": getattr(meta, "kind", None), "content_id": getattr(meta, "content_id", None), "version": getattr(meta, "version", None), "parent_id": getattr(meta, "parent_id", None), "author": _serialize_model(getattr(meta, "author", None)), "source": _serialize_model(getattr(meta, "source", None)), "support": _serialize_model(getattr(meta, "support", None)), "categories": getattr(meta, "categories", None), "dependencies": getattr(meta, "dependencies", None), "created": str(getattr(meta, "created", "")), "last_modified": str(getattr(meta, "last_modified", "")), } result["valid"] = True except Exception as ex: error_msg = f"Error retrieving metadata: {ex}" logger.exception(error_msg) result["error"] = error_msg result["errors"].append(error_msg) return result class SentinelMLAnalyticsSettingsListTool(MCPToolBase): """Tool for listing all Sentinel ML analytics settings in the current workspace.""" name = "sentinel_ml_analytics_settings_list" description = "List all Sentinel ML analytics settings in the current workspace." async def run(self, ctx: Context, **kwargs): """ List all ML analytics settings in the current Sentinel workspace. Returns MCP-compliant dict with 'settings', 'valid', 'errors', and 'error'. """ logger = self.logger result = {"settings": [], "valid": False, "errors": []} workspace_name, resource_group, subscription_id = self.get_azure_context(ctx) if not (workspace_name and resource_group and subscription_id): error_msg = ( "Missing required Azure context (workspace_name, resource_group, " "subscription_id)." ) logger.error(error_msg) result["error"] = error_msg result["errors"].append(error_msg) return result try: client = self.get_securityinsight_client(subscription_id) # Use the preview API version for ML Analytics support ml_settings_paged = client.security_ml_analytics_settings.list( resource_group, workspace_name ) settings = [] for s in ml_settings_paged: s_dict = s.as_dict() if hasattr(s, "as_dict") else dict(s) settings.append( { "id": s_dict.get("id"), "name": s_dict.get("name"), "description": s_dict.get("description"), "enabled": s_dict.get("enabled"), } ) result["settings"] = settings result["valid"] = True except Exception as ex: error_msg = f"Error listing ML analytics settings: {ex}" logger.exception(error_msg) result["error"] = error_msg result["errors"].append(error_msg) return result class SentinelMLAnalyticsSettingGetTool(MCPToolBase): """ Tool for retrieving a specific Sentinel ML analytics setting by name. """ name = "sentinel_ml_analytics_setting_get" description = "Get a specific Sentinel ML analytics setting by name." async def run(self, ctx: Context, **kwargs): """ Get a specific ML analytics setting by name. Parameters: setting_name (str, required): The name of the ML analytics setting. Returns MCP-compliant dict with 'setting', 'valid', 'errors', and 'error'. """ logger = self.logger # Extract parameters using the base class method setting_name = self._extract_param(kwargs, "setting_name") result = {"setting": {}, "valid": False, "errors": []} if not setting_name: error_msg = "Missing required parameter: setting_name" logger.error(error_msg) result["error"] = error_msg result["errors"].append(error_msg) return result workspace_name, resource_group, subscription_id = self.get_azure_context(ctx) if not (workspace_name and resource_group and subscription_id): error_msg = ( "Missing required Azure context (workspace_name, resource_group, " "subscription_id)." ) logger.error(error_msg) result["error"] = error_msg result["errors"].append(error_msg) return result try: client = self.get_securityinsight_client(subscription_id) s = client.security_ml_analytics_settings.get( resource_group, workspace_name, setting_name ) s_dict = s.as_dict() if hasattr(s, "as_dict") else dict(s) enriched = { "id": s_dict.get("id"), "name": s_dict.get("name"), "kind": s_dict.get("kind"), "etag": s_dict.get("etag"), "type": s_dict.get("type"), "description": s_dict.get("description"), "display_name": s_dict.get("display_name"), "enabled": s_dict.get("enabled"), "last_modified_utc": s_dict.get("last_modified_utc"), "required_data_connectors": s_dict.get("required_data_connectors"), "tactics": s_dict.get("tactics"), "techniques": s_dict.get("techniques"), "anomaly_version": s_dict.get("anomaly_version"), "customizable_observations": s_dict.get("customizable_observations"), "frequency": s_dict.get("frequency"), "settings_status": s_dict.get("settings_status"), "is_default_settings": s_dict.get("is_default_settings"), "anomaly_settings_version": s_dict.get("anomaly_settings_version"), "settings_definition_id": s_dict.get("settings_definition_id"), "properties": None, "referenced_by_analytic_rules": [], } # Parse 'properties' if present props = getattr(s, "properties", None) if props is not None: if hasattr(props, "as_dict"): enriched["properties"] = props.as_dict() elif isinstance(props, dict): enriched["properties"] = props else: enriched["properties"] = {"raw": str(props)} # Attempt to find analytic rules that reference this ML setting analytic_rules = [] for rule in client.alert_rules.list(resource_group, workspace_name): rule_dict = rule.as_dict() if hasattr(rule, "as_dict") else dict(rule) found_ref = False for val in rule_dict.values(): if isinstance(val, str) and ( enriched["name"] in val or enriched["id"] in val ): found_ref = True elif isinstance(val, dict): if any( enriched["name"] in str(v) or enriched["id"] in str(v) for v in val.values() ): found_ref = True elif isinstance(val, list): if any( enriched["name"] in str(v) or enriched["id"] in str(v) for v in val ): found_ref = True if found_ref: analytic_rules.append( { "rule_name": rule_dict.get( "display_name", rule_dict.get("name") ), "rule_id": rule_dict.get("id"), "rule_kind": rule_dict.get("kind"), } ) enriched["referenced_by_analytic_rules"] = analytic_rules result["setting"] = enriched result["valid"] = True except Exception as ex: error_msg = f"Error retrieving ML analytics setting: {ex}" logger.exception(error_msg) result["error"] = error_msg result["errors"].append(error_msg) return result def register_tools(mcp): """Register all Sentinel workspace-related tools with the MCP server instance.""" SentinelWorkspaceGetTool.register(mcp) SentinelSourceControlsListTool.register(mcp) SentinelSourceControlGetTool.register(mcp) SentinelMetadataListTool.register(mcp) SentinelMetadataGetTool.register(mcp) SentinelMLAnalyticsSettingsListTool.register(mcp) SentinelMLAnalyticsSettingGetTool.register(mcp)

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dstreefkerk/ms-sentinel-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server