Skip to main content
Glama

Wazuh MCP Server

by gensecaihq
cluster.py70.8 kB
"""Cluster and manager monitoring tools for Wazuh MCP Server.""" from typing import Any, Dict, List import mcp.types as types from datetime import datetime, timedelta from collections import defaultdict, Counter import re from .base import BaseTool from ..utils import validate_cluster_health_query, validate_manager_error_logs_query class ClusterTools(BaseTool): """Tools for Wazuh cluster monitoring, health checking, and manager log analysis.""" @property def tool_definitions(self) -> List[types.Tool]: """Return cluster and manager-related tool definitions.""" return [ types.Tool( name="get_wazuh_cluster_health", description="Get comprehensive cluster diagnostics with node health and performance analysis", inputSchema={ "type": "object", "properties": { "include_performance": { "type": "boolean", "description": "Include detailed performance metrics for all nodes", "default": True }, "include_sync_status": { "type": "boolean", "description": "Include cluster synchronization status analysis", "default": True }, "health_threshold": { "type": "number", "description": "Health score threshold (0-100) for alerts", "default": 80, "minimum": 0, "maximum": 100 }, "include_historical": { "type": "boolean", "description": "Include historical health trends", "default": False }, "node_filter": { "type": "array", "items": {"type": "string"}, "description": "Filter by specific node names (optional)" } } } ), types.Tool( name="get_wazuh_cluster_nodes", description="Get individual node monitoring and detailed health analysis for each cluster member", inputSchema={ "type": "object", "properties": { "node_name": { "type": "string", "description": "Specific node name to analyze (optional, analyzes all if not provided)" }, "include_stats": { "type": "boolean", "description": "Include detailed node statistics", "default": True }, "include_logs": { "type": "boolean", "description": "Include recent log analysis for nodes", "default": False }, "performance_window_hours": { "type": "integer", "description": "Performance analysis window in hours", "default": 24, "minimum": 1, "maximum": 168 } } } ), types.Tool( name="search_wazuh_manager_logs", description="Enhanced forensic log search with timeline reconstruction and pattern analysis", inputSchema={ "type": "object", "properties": { "search_pattern": { "type": "string", "description": "Search pattern or keyword (supports regex)", "default": "" }, "log_level": { "type": "array", "items": {"type": "string"}, "description": "Filter by log levels", "default": ["error", "warning", "info", "debug"] }, "time_range_hours": { "type": "integer", "description": "Time range in hours to search", "default": 24, "minimum": 1, "maximum": 168 }, "max_results": { "type": "integer", "description": "Maximum number of results to return", "default": 100, "minimum": 1, "maximum": 1000 }, "include_context": { "type": "boolean", "description": "Include surrounding log context for matches", "default": True }, "timeline_analysis": { "type": "boolean", "description": "Generate timeline reconstruction", "default": False } } } ), types.Tool( name="get_wazuh_manager_error_logs", description="Get manager error logs with root cause analysis and trend detection", inputSchema={ "type": "object", "properties": { "severity_filter": { "type": "array", "items": {"type": "string"}, "description": "Filter by error severity levels", "default": ["critical", "error", "warning"] }, "time_range_hours": { "type": "integer", "description": "Time range in hours for analysis", "default": 24, "minimum": 1, "maximum": 168 }, "include_root_cause": { "type": "boolean", "description": "Include automated root cause analysis", "default": True }, "include_trends": { "type": "boolean", "description": "Include error trend analysis", "default": True }, "group_similar": { "type": "boolean", "description": "Group similar errors together", "default": True }, "max_errors": { "type": "integer", "description": "Maximum number of errors to analyze", "default": 500, "minimum": 10, "maximum": 2000 } } } ) ] def get_handler_mapping(self) -> Dict[str, callable]: """Return mapping of tool names to handler methods.""" return { "get_wazuh_cluster_health": self.handle_cluster_health, "get_wazuh_cluster_nodes": self.handle_cluster_nodes, "search_wazuh_manager_logs": self.handle_manager_logs_search, "get_wazuh_manager_error_logs": self.handle_manager_error_logs } async def handle_cluster_health(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle cluster health monitoring request.""" try: # Validate input validated_args = validate_cluster_health_query(arguments) include_performance = validated_args.get("include_performance", True) include_sync_status = validated_args.get("include_sync_status", True) health_threshold = validated_args.get("health_threshold", 80) include_historical = validated_args.get("include_historical", False) node_filter = validated_args.get("node_filter") # Get cluster information cluster_status = await self._get_cluster_status() cluster_nodes = await self._get_cluster_nodes(node_filter) # Calculate overall cluster health cluster_health_score = self._calculate_cluster_health(cluster_nodes, cluster_status) # Generate comprehensive analysis analysis = { "overview": { "cluster_enabled": cluster_status.get("enabled", False), "cluster_health_score": cluster_health_score, "health_status": self._get_health_status_label(cluster_health_score), "total_nodes": len(cluster_nodes), "healthy_nodes": len([n for n in cluster_nodes if n.get("health_score", 0) >= health_threshold]), "analysis_timestamp": datetime.utcnow().isoformat() }, "nodes_summary": self._analyze_nodes_summary(cluster_nodes), "connectivity": self._analyze_cluster_connectivity(cluster_nodes), "load_distribution": self._analyze_load_distribution(cluster_nodes) } if include_performance: analysis["performance_metrics"] = await self._get_cluster_performance_metrics(cluster_nodes) if include_sync_status: analysis["synchronization"] = await self._analyze_sync_status(cluster_nodes) if include_historical: analysis["trends"] = await self._get_cluster_trends() # Generate alerts and recommendations analysis["alerts"] = self._generate_cluster_alerts(analysis, health_threshold) analysis["recommendations"] = self._generate_cluster_recommendations(analysis) return self._format_response(analysis, metadata={ "source": "wazuh_cluster", "analysis_type": "comprehensive_cluster_health", "health_threshold": health_threshold }) except Exception as e: self.logger.error(f"Error in cluster health analysis: {str(e)}") return self._format_error_response(e, {"operation": "get_wazuh_cluster_health"}) async def handle_cluster_nodes(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle individual node monitoring request.""" try: node_name = arguments.get("node_name") include_stats = arguments.get("include_stats", True) include_logs = arguments.get("include_logs", False) performance_window_hours = arguments.get("performance_window_hours", 24) # Get node information if node_name: nodes = await self._get_specific_node_info(node_name) else: nodes = await self._get_cluster_nodes() # Enrich node data enriched_nodes = [] for node in nodes: enriched_node = await self._enrich_node_data( node, include_stats, include_logs, performance_window_hours ) enriched_nodes.append(enriched_node) # Generate node analysis analysis = { "nodes": enriched_nodes, "summary": self._generate_nodes_summary(enriched_nodes), "performance_comparison": self._compare_node_performance(enriched_nodes), "role_analysis": self._analyze_node_roles(enriched_nodes) } if len(enriched_nodes) > 1: analysis["cluster_balance"] = self._analyze_cluster_balance(enriched_nodes) return self._format_response(analysis, metadata={ "source": "wazuh_cluster_nodes", "analysis_type": "individual_node_monitoring", "target_node": node_name or "all_nodes" }) except Exception as e: self.logger.error(f"Error in cluster nodes analysis: {str(e)}") return self._format_error_response(e, {"operation": "get_wazuh_cluster_nodes"}) async def handle_manager_logs_search(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle manager logs search with forensic capabilities.""" try: search_pattern = arguments.get("search_pattern", "") log_level = arguments.get("log_level", ["error", "warning", "info", "debug"]) time_range_hours = arguments.get("time_range_hours", 24) max_results = arguments.get("max_results", 100) include_context = arguments.get("include_context", True) timeline_analysis = arguments.get("timeline_analysis", False) # Get manager logs logs = await self._search_manager_logs( search_pattern, log_level, time_range_hours, max_results ) # Process and analyze logs processed_logs = self._process_log_entries(logs, include_context) analysis = { "search_results": { "total_matches": len(processed_logs), "search_pattern": search_pattern, "time_range_hours": time_range_hours, "log_entries": processed_logs[:max_results] }, "patterns": self._analyze_log_patterns(processed_logs), "frequency_analysis": self._analyze_log_frequency(processed_logs), "severity_distribution": self._analyze_severity_distribution(processed_logs) } if timeline_analysis: analysis["timeline"] = self._generate_timeline_reconstruction(processed_logs) # Generate insights analysis["insights"] = self._generate_log_insights(processed_logs, search_pattern) return self._format_response(analysis, metadata={ "source": "wazuh_manager_logs", "analysis_type": "forensic_log_search", "search_query": search_pattern }) except Exception as e: self.logger.error(f"Error in manager logs search: {str(e)}") return self._format_error_response(e, {"operation": "search_wazuh_manager_logs"}) async def handle_manager_error_logs(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle manager error logs analysis with root cause detection.""" try: # Validate input validated_args = validate_manager_error_logs_query(arguments) severity_filter = validated_args.get("severity_filter", ["critical", "error", "warning"]) time_range_hours = validated_args.get("time_range_hours", 24) include_root_cause = validated_args.get("include_root_cause", True) include_trends = validated_args.get("include_trends", True) group_similar = validated_args.get("group_similar", True) max_errors = validated_args.get("max_errors", 500) # Get error logs error_logs = await self._get_manager_error_logs( severity_filter, time_range_hours, max_errors ) # Process and group errors if group_similar: grouped_errors = self._group_similar_errors(error_logs) else: grouped_errors = {"ungrouped": error_logs} analysis = { "overview": { "total_errors": len(error_logs), "unique_error_types": len(grouped_errors), "time_range_hours": time_range_hours, "severity_distribution": Counter( log.get("level", "unknown") for log in error_logs ) }, "error_groups": self._analyze_error_groups(grouped_errors), "critical_errors": self._identify_critical_errors(error_logs), "error_patterns": self._analyze_error_patterns(error_logs) } if include_root_cause: analysis["root_cause_analysis"] = self._perform_root_cause_analysis(grouped_errors) if include_trends: analysis["trends"] = self._analyze_error_trends(error_logs, time_range_hours) # Generate actionable recommendations analysis["remediation"] = self._generate_error_remediation(analysis) return self._format_response(analysis, metadata={ "source": "wazuh_manager_error_logs", "analysis_type": "comprehensive_error_analysis", "severity_filters": severity_filter }) except Exception as e: self.logger.error(f"Error in manager error logs analysis: {str(e)}") return self._format_error_response(e, {"operation": "get_wazuh_manager_error_logs"}) # Helper methods for cluster health async def _get_cluster_status(self) -> Dict[str, Any]: """Get cluster status information.""" try: cluster_response = await self.api_client.get_cluster_status() return cluster_response.get("data", {}) except Exception as e: self.logger.warning(f"Could not get cluster status: {str(e)}") return {"enabled": False, "error": str(e)} async def _get_cluster_nodes(self, node_filter: List[str] = None) -> List[Dict[str, Any]]: """Get cluster nodes information.""" try: nodes_response = await self.api_client.get_cluster_nodes() nodes = nodes_response.get("data", {}).get("affected_items", []) if node_filter: nodes = [node for node in nodes if node.get("name") in node_filter] # Enrich nodes with health scores for node in nodes: node["health_score"] = self._calculate_node_health_score(node) node["role"] = self._determine_node_role(node) return nodes except Exception as e: self.logger.error(f"Could not get cluster nodes: {str(e)}") return [] async def _get_specific_node_info(self, node_name: str) -> List[Dict[str, Any]]: """Get information for a specific node.""" try: node_response = await self.api_client.get_cluster_node_info(node_name) node_data = node_response.get("data", {}) return [node_data] if node_data else [] except Exception as e: self.logger.error(f"Could not get node {node_name} info: {str(e)}") return [] def _calculate_cluster_health(self, nodes: List[Dict[str, Any]], cluster_status: Dict[str, Any]) -> int: """Calculate overall cluster health score.""" if not cluster_status.get("enabled", False): return 0 # No cluster = no cluster health if not nodes: return 0 # No nodes = no health # Calculate average node health node_scores = [node.get("health_score", 0) for node in nodes] avg_node_health = sum(node_scores) / len(node_scores) # Adjust for cluster-specific factors cluster_health = avg_node_health # Penalty for missing nodes (assuming standard 3-node cluster) expected_nodes = 3 if len(nodes) < expected_nodes: penalty = (expected_nodes - len(nodes)) * 15 cluster_health = max(0, cluster_health - penalty) # Bonus for all nodes being healthy healthy_nodes = sum(1 for score in node_scores if score >= 80) if healthy_nodes == len(nodes) and len(nodes) >= 2: cluster_health = min(100, cluster_health + 5) return int(cluster_health) def _calculate_node_health_score(self, node: Dict[str, Any]) -> int: """Calculate health score for individual node.""" score = 0 # Connection status (40 points) status = node.get("status", "").lower() if status == "connected": score += 40 elif status == "disconnected": score += 0 else: score += 20 # Unknown/other status # Node type and role (20 points) node_type = node.get("type", "").lower() if node_type == "master": score += 20 elif node_type == "worker": score += 15 else: score += 10 # Last keep alive (20 points) last_keep_alive = node.get("lastKeepAlive") if last_keep_alive: try: last_time = datetime.fromisoformat(last_keep_alive.replace("Z", "+00:00")) minutes_ago = (datetime.utcnow() - last_time.replace(tzinfo=None)).total_seconds() / 60 if minutes_ago <= 5: score += 20 elif minutes_ago <= 15: score += 15 elif minutes_ago <= 60: score += 10 else: score += 0 except: score += 5 # Version consistency (10 points) version = node.get("version") if version: score += 10 # Performance indicators (10 points) # This would include CPU, memory, disk usage if available stats = node.get("stats", {}) if stats: score += 10 return min(score, 100) def _get_health_status_label(self, health_score: int) -> str: """Get health status label from score.""" if health_score >= 90: return "excellent" elif health_score >= 80: return "good" elif health_score >= 60: return "fair" elif health_score >= 40: return "poor" else: return "critical" def _determine_node_role(self, node: Dict[str, Any]) -> str: """Determine the role of a cluster node.""" node_type = node.get("type", "").lower() if node_type == "master": return "master" elif node_type == "worker": return "worker" else: return "unknown" def _analyze_nodes_summary(self, nodes: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze summary statistics for cluster nodes.""" if not nodes: return {"total": 0, "message": "No nodes found"} status_counts = Counter(node.get("status", "unknown") for node in nodes) role_counts = Counter(node.get("role", "unknown") for node in nodes) health_scores = [node.get("health_score", 0) for node in nodes] return { "total_nodes": len(nodes), "status_distribution": dict(status_counts), "role_distribution": dict(role_counts), "health_metrics": { "average_health": sum(health_scores) / len(health_scores), "healthy_nodes": sum(1 for score in health_scores if score >= 80), "unhealthy_nodes": sum(1 for score in health_scores if score < 60) }, "master_nodes": sum(1 for node in nodes if node.get("role") == "master"), "worker_nodes": sum(1 for node in nodes if node.get("role") == "worker") } def _analyze_cluster_connectivity(self, nodes: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze cluster connectivity status.""" connected_nodes = [node for node in nodes if node.get("status") == "connected"] disconnected_nodes = [node for node in nodes if node.get("status") != "connected"] connectivity_health = "excellent" if disconnected_nodes: if len(disconnected_nodes) == len(nodes): connectivity_health = "critical" elif len(disconnected_nodes) > len(nodes) / 2: connectivity_health = "poor" else: connectivity_health = "degraded" return { "connected_nodes": len(connected_nodes), "disconnected_nodes": len(disconnected_nodes), "connectivity_health": connectivity_health, "connectivity_percentage": (len(connected_nodes) / len(nodes)) * 100 if nodes else 0, "problematic_nodes": [ { "name": node.get("name"), "status": node.get("status"), "last_seen": node.get("lastKeepAlive") } for node in disconnected_nodes ] } def _analyze_load_distribution(self, nodes: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze load distribution across cluster nodes.""" # Mock load analysis (would use real metrics in production) load_data = [] for node in nodes: # Simulate load data based on node health and status base_load = 50 if node.get("status") == "connected" else 0 health_score = node.get("health_score", 0) # Higher health generally means handling more load efficiently simulated_load = base_load + (health_score / 5) load_data.append({ "node_name": node.get("name"), "cpu_usage": min(simulated_load + 10, 100), "memory_usage": min(simulated_load, 100), "network_usage": min(simulated_load - 10, 100) }) # Calculate load balance score if load_data: cpu_loads = [data["cpu_usage"] for data in load_data] load_variance = sum((x - sum(cpu_loads)/len(cpu_loads))**2 for x in cpu_loads) / len(cpu_loads) balance_score = max(0, 100 - load_variance) else: balance_score = 0 return { "load_balance_score": int(balance_score), "node_loads": load_data, "average_cpu": sum(data["cpu_usage"] for data in load_data) / len(load_data) if load_data else 0, "load_distribution_health": "good" if balance_score > 70 else "needs_attention" } async def _get_cluster_performance_metrics(self, nodes: List[Dict[str, Any]]) -> Dict[str, Any]: """Get performance metrics for the cluster.""" performance_data = { "cluster_throughput": { "alerts_per_second": 0, "events_per_second": 0, "api_requests_per_minute": 0 }, "resource_usage": { "total_memory_gb": 0, "total_cpu_cores": 0, "storage_usage_gb": 0 }, "response_times": [] } # Aggregate performance data from all nodes for node in nodes: # Mock performance data (would come from real metrics) node_stats = node.get("stats", {}) performance_data["cluster_throughput"]["alerts_per_second"] += node_stats.get("eps", 10) performance_data["resource_usage"]["total_memory_gb"] += node_stats.get("memory_gb", 8) performance_data["resource_usage"]["total_cpu_cores"] += node_stats.get("cpu_cores", 4) performance_data["response_times"].append({ "node": node.get("name"), "avg_response_ms": node_stats.get("response_time_ms", 100) }) return performance_data async def _analyze_sync_status(self, nodes: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze cluster synchronization status.""" # Mock sync analysis (would check actual sync status) sync_status = { "overall_sync_health": "good", "nodes_in_sync": len([n for n in nodes if n.get("status") == "connected"]), "nodes_out_of_sync": 0, "last_sync_times": [], "sync_issues": [] } # Check for potential sync issues connected_nodes = [n for n in nodes if n.get("status") == "connected"] if len(connected_nodes) < len(nodes): sync_status["sync_issues"].append("Some nodes disconnected, sync may be affected") sync_status["overall_sync_health"] = "degraded" if len(connected_nodes) < 2: sync_status["sync_issues"].append("Insufficient nodes for proper synchronization") sync_status["overall_sync_health"] = "critical" return sync_status async def _get_cluster_trends(self) -> Dict[str, Any]: """Get historical cluster health trends.""" # Mock trend data (would come from historical metrics) return { "health_trend": "stable", "performance_trend": "improving", "node_availability": { "last_24h": 98.5, "last_7d": 99.2, "last_30d": 97.8 }, "alerts": [ "Node availability dipped to 95% last week", "Performance improved 15% over last month" ] } def _generate_cluster_alerts(self, analysis: Dict[str, Any], health_threshold: int) -> List[Dict[str, Any]]: """Generate alerts based on cluster analysis.""" alerts = [] # Overall health alerts overview = analysis.get("overview", {}) cluster_health = overview.get("cluster_health_score", 0) if cluster_health < health_threshold: alerts.append({ "level": "critical" if cluster_health < 50 else "warning", "type": "cluster_health", "message": f"Cluster health score ({cluster_health}) below threshold ({health_threshold})", "recommendation": "Investigate node connectivity and performance issues" }) # Node connectivity alerts connectivity = analysis.get("connectivity", {}) if connectivity.get("disconnected_nodes", 0) > 0: alerts.append({ "level": "warning", "type": "node_connectivity", "message": f"{connectivity['disconnected_nodes']} nodes are disconnected", "affected_nodes": [n["name"] for n in connectivity.get("problematic_nodes", [])], "recommendation": "Check network connectivity and node status" }) # Load balance alerts load_dist = analysis.get("load_distribution", {}) if load_dist.get("load_balance_score", 100) < 70: alerts.append({ "level": "warning", "type": "load_imbalance", "message": "Cluster load is not well distributed", "recommendation": "Review node configurations and workload distribution" }) return alerts def _generate_cluster_recommendations(self, analysis: Dict[str, Any]) -> List[str]: """Generate recommendations based on cluster analysis.""" recommendations = [] overview = analysis.get("overview", {}) nodes_summary = analysis.get("nodes_summary", {}) # Health-based recommendations unhealthy_nodes = nodes_summary.get("health_metrics", {}).get("unhealthy_nodes", 0) if unhealthy_nodes > 0: recommendations.append(f"Address {unhealthy_nodes} unhealthy nodes to improve cluster stability") # Role distribution recommendations master_nodes = nodes_summary.get("master_nodes", 0) if master_nodes < 2: recommendations.append("Consider adding more master nodes for high availability") elif master_nodes > 3: recommendations.append("Consider reducing master nodes to optimize resource usage") # Connectivity recommendations connectivity = analysis.get("connectivity", {}) if connectivity.get("connectivity_percentage", 100) < 100: recommendations.append("Investigate and resolve node connectivity issues") # Performance recommendations load_dist = analysis.get("load_distribution", {}) if load_dist.get("average_cpu", 0) > 80: recommendations.append("Consider scaling cluster due to high CPU usage") return recommendations # Helper methods for node monitoring async def _enrich_node_data(self, node: Dict[str, Any], include_stats: bool, include_logs: bool, performance_window_hours: int) -> Dict[str, Any]: """Enrich node data with additional information.""" enriched = {**node} if include_stats: try: stats_response = await self.api_client.get_cluster_node_stats(node.get("name")) enriched["detailed_stats"] = stats_response.get("data", {}) except Exception as e: enriched["stats_error"] = str(e) if include_logs: try: # Get recent logs for this node logs_response = await self.api_client.get_node_logs( node.get("name"), hours=min(performance_window_hours, 24) ) enriched["recent_logs"] = logs_response.get("data", [])[:10] # Last 10 logs except Exception as e: enriched["logs_error"] = str(e) # Add performance analysis enriched["performance_analysis"] = self._analyze_node_performance(enriched, performance_window_hours) return enriched def _analyze_node_performance(self, node: Dict[str, Any], window_hours: int) -> Dict[str, Any]: """Analyze individual node performance.""" stats = node.get("detailed_stats", {}) # Mock performance analysis (would use real metrics) analysis = { "cpu_efficiency": 85, # Mock value "memory_efficiency": 78, # Mock value "network_performance": 92, # Mock value "disk_performance": 88, # Mock value "overall_performance_score": 0 } # Calculate overall performance score scores = [analysis[key] for key in analysis if key != "overall_performance_score"] analysis["overall_performance_score"] = sum(scores) / len(scores) if scores else 0 return analysis def _generate_nodes_summary(self, nodes: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate summary for individual nodes analysis.""" if not nodes: return {"total": 0} performance_scores = [ node.get("performance_analysis", {}).get("overall_performance_score", 0) for node in nodes ] return { "total_nodes_analyzed": len(nodes), "average_performance": sum(performance_scores) / len(performance_scores) if performance_scores else 0, "best_performing_node": max(nodes, key=lambda x: x.get("performance_analysis", {}).get("overall_performance_score", 0)).get("name") if nodes else None, "nodes_needing_attention": [ node.get("name") for node in nodes if node.get("health_score", 0) < 70 ] } def _compare_node_performance(self, nodes: List[Dict[str, Any]]) -> Dict[str, Any]: """Compare performance metrics across nodes.""" if len(nodes) < 2: return {"message": "Need at least 2 nodes for comparison"} performance_comparison = {} metrics = ["cpu_efficiency", "memory_efficiency", "network_performance", "disk_performance"] for metric in metrics: values = [ node.get("performance_analysis", {}).get(metric, 0) for node in nodes ] performance_comparison[metric] = { "average": sum(values) / len(values), "min": min(values), "max": max(values), "variance": sum((x - sum(values)/len(values))**2 for x in values) / len(values) } return performance_comparison def _analyze_node_roles(self, nodes: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze distribution and health of node roles.""" role_analysis = defaultdict(lambda: {"count": 0, "health_scores": [], "performance_scores": []}) for node in nodes: role = node.get("role", "unknown") role_data = role_analysis[role] role_data["count"] += 1 role_data["health_scores"].append(node.get("health_score", 0)) role_data["performance_scores"].append( node.get("performance_analysis", {}).get("overall_performance_score", 0) ) # Calculate averages for each role role_stats = {} for role, data in role_analysis.items(): health_scores = data["health_scores"] perf_scores = data["performance_scores"] role_stats[role] = { "count": data["count"], "average_health": sum(health_scores) / len(health_scores) if health_scores else 0, "average_performance": sum(perf_scores) / len(perf_scores) if perf_scores else 0 } return role_stats def _analyze_cluster_balance(self, nodes: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze overall cluster balance and efficiency.""" health_scores = [node.get("health_score", 0) for node in nodes] performance_scores = [ node.get("performance_analysis", {}).get("overall_performance_score", 0) for node in nodes ] # Calculate balance metrics health_variance = sum((x - sum(health_scores)/len(health_scores))**2 for x in health_scores) / len(health_scores) perf_variance = sum((x - sum(performance_scores)/len(performance_scores))**2 for x in performance_scores) / len(performance_scores) balance_score = max(0, 100 - (health_variance + perf_variance) / 2) return { "balance_score": balance_score, "health_distribution_balance": 100 - health_variance, "performance_distribution_balance": 100 - perf_variance, "balance_status": "good" if balance_score > 70 else "needs_improvement", "recommendations": self._get_balance_recommendations(balance_score, nodes) } def _get_balance_recommendations(self, balance_score: float, nodes: List[Dict[str, Any]]) -> List[str]: """Get recommendations for improving cluster balance.""" recommendations = [] if balance_score < 70: # Find underperforming nodes avg_health = sum(node.get("health_score", 0) for node in nodes) / len(nodes) underperforming = [ node.get("name") for node in nodes if node.get("health_score", 0) < avg_health * 0.8 ] if underperforming: recommendations.append(f"Optimize nodes: {', '.join(underperforming)}") recommendations.append("Review cluster configuration for better load distribution") recommendations.append("Consider rebalancing workloads across nodes") return recommendations # Helper methods for log analysis async def _search_manager_logs(self, search_pattern: str, log_levels: List[str], time_range_hours: int, max_results: int) -> List[Dict[str, Any]]: """Search manager logs with given criteria.""" try: # Build search query end_time = datetime.utcnow() start_time = end_time - timedelta(hours=time_range_hours) logs_response = await self.api_client.search_manager_logs( query=search_pattern, levels=log_levels, start_time=start_time.isoformat(), end_time=end_time.isoformat(), limit=max_results ) return logs_response.get("data", {}).get("logs", []) except Exception as e: self.logger.error(f"Error searching manager logs: {str(e)}") # Return mock data for demonstration return self._generate_mock_log_data(search_pattern, log_levels, max_results) def _generate_mock_log_data(self, search_pattern: str, log_levels: List[str], max_results: int) -> List[Dict[str, Any]]: """Generate mock log data for demonstration.""" mock_logs = [] base_time = datetime.utcnow() for i in range(min(max_results, 20)): log_time = base_time - timedelta(minutes=i * 5) level = log_levels[i % len(log_levels)] if log_levels else "info" mock_logs.append({ "timestamp": log_time.isoformat() + "Z", "level": level, "message": f"Sample log message containing {search_pattern}" if search_pattern else f"Sample {level} message {i}", "component": "wazuh-manager" if i % 2 == 0 else "wazuh-remoted", "thread_id": f"thread_{i % 5}", "location": f"src/main.c:{100 + i}" }) return mock_logs def _process_log_entries(self, logs: List[Dict[str, Any]], include_context: bool) -> List[Dict[str, Any]]: """Process and enrich log entries.""" processed = [] for i, log in enumerate(logs): processed_log = { **log, "severity_score": self._calculate_log_severity_score(log), "category": self._categorize_log_entry(log) } if include_context and i > 0: processed_log["context_before"] = logs[i-1].get("message", "") if include_context and i < len(logs) - 1: processed_log["context_after"] = logs[i+1].get("message", "") processed.append(processed_log) return processed def _calculate_log_severity_score(self, log: Dict[str, Any]) -> int: """Calculate severity score for log entry.""" level = log.get("level", "").lower() severity_scores = { "critical": 100, "error": 80, "warning": 60, "info": 40, "debug": 20 } base_score = severity_scores.get(level, 30) # Adjust based on message content message = log.get("message", "").lower() if any(keyword in message for keyword in ["failed", "error", "exception", "critical"]): base_score = min(100, base_score + 20) return base_score def _categorize_log_entry(self, log: Dict[str, Any]) -> str: """Categorize log entry based on content.""" message = log.get("message", "").lower() component = log.get("component", "").lower() if "authentication" in message or "login" in message: return "authentication" elif "connection" in message or "network" in message: return "connectivity" elif "cluster" in message or "sync" in message: return "cluster" elif "database" in message or "db" in message: return "database" elif "agent" in message: return "agent_management" elif "rule" in message or "decoder" in message: return "rules_engine" else: return "general" def _analyze_log_patterns(self, logs: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze patterns in log entries.""" patterns = { "most_common_messages": [], "error_patterns": [], "component_activity": Counter(), "hourly_distribution": Counter() } # Analyze message patterns message_counts = Counter() for log in logs: message = log.get("message", "") # Normalize message for pattern detection normalized = re.sub(r'\d+', 'X', message) # Replace numbers with X normalized = re.sub(r'[a-f0-9]{8,}', 'HASH', normalized) # Replace hashes message_counts[normalized] += 1 patterns["most_common_messages"] = [ {"pattern": pattern, "count": count} for pattern, count in message_counts.most_common(10) ] # Analyze component activity for log in logs: component = log.get("component", "unknown") patterns["component_activity"][component] += 1 # Analyze temporal patterns for log in logs: timestamp = log.get("timestamp", "") try: dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) hour_key = dt.strftime("%H:00") patterns["hourly_distribution"][hour_key] += 1 except: continue return { "most_common_messages": patterns["most_common_messages"], "component_activity": dict(patterns["component_activity"]), "hourly_distribution": dict(patterns["hourly_distribution"]), "peak_activity_hour": max(patterns["hourly_distribution"].items(), key=lambda x: x[1])[0] if patterns["hourly_distribution"] else None } def _analyze_log_frequency(self, logs: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze frequency patterns in logs.""" if not logs: return {"total": 0} # Group by time intervals time_buckets = defaultdict(int) for log in logs: timestamp = log.get("timestamp", "") try: dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) # 5-minute buckets bucket = dt.replace(minute=dt.minute - dt.minute % 5, second=0, microsecond=0) time_buckets[bucket] += 1 except: continue # Calculate frequency statistics frequencies = list(time_buckets.values()) avg_frequency = sum(frequencies) / len(frequencies) if frequencies else 0 return { "total_logs": len(logs), "time_span_minutes": len(time_buckets) * 5, "average_logs_per_5min": avg_frequency, "peak_frequency": max(frequencies) if frequencies else 0, "frequency_trend": "increasing" if frequencies and frequencies[-1] > frequencies[0] else "stable" } def _analyze_severity_distribution(self, logs: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze distribution of log severity levels.""" level_counts = Counter(log.get("level", "unknown") for log in logs) severity_scores = [log.get("severity_score", 0) for log in logs] return { "level_distribution": dict(level_counts), "average_severity_score": sum(severity_scores) / len(severity_scores) if severity_scores else 0, "high_severity_logs": sum(1 for score in severity_scores if score >= 80), "critical_logs": sum(1 for score in severity_scores if score >= 90) } def _generate_timeline_reconstruction(self, logs: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate timeline reconstruction from logs.""" # Sort logs by timestamp sorted_logs = sorted(logs, key=lambda x: x.get("timestamp", "")) # Create timeline events timeline_events = [] for log in sorted_logs: timeline_events.append({ "timestamp": log.get("timestamp"), "event_type": log.get("category", "general"), "severity": log.get("level", "info"), "description": log.get("message", "")[:100] + "..." if len(log.get("message", "")) > 100 else log.get("message", ""), "component": log.get("component", "unknown") }) # Identify event sequences event_sequences = self._identify_event_sequences(timeline_events) return { "timeline_events": timeline_events, "event_sequences": event_sequences, "total_events": len(timeline_events), "time_span": { "start": sorted_logs[0].get("timestamp") if sorted_logs else None, "end": sorted_logs[-1].get("timestamp") if sorted_logs else None } } def _identify_event_sequences(self, events: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Identify related event sequences in timeline.""" sequences = [] # Simple sequence detection based on component and time proximity current_sequence = [] last_component = None last_time = None for event in events: event_time = event.get("timestamp") component = event.get("component") try: current_time = datetime.fromisoformat(event_time.replace("Z", "+00:00")) # Start new sequence if component changes or time gap > 5 minutes if (last_component and component != last_component) or \ (last_time and (current_time - last_time).total_seconds() > 300): if len(current_sequence) > 1: sequences.append({ "component": last_component, "event_count": len(current_sequence), "duration_seconds": (current_sequence[-1]["time"] - current_sequence[0]["time"]).total_seconds(), "events": current_sequence }) current_sequence = [] current_sequence.append({ "time": current_time, "description": event.get("description", "") }) last_component = component last_time = current_time except: continue # Add final sequence if len(current_sequence) > 1: sequences.append({ "component": last_component, "event_count": len(current_sequence), "events": current_sequence }) return sequences def _generate_log_insights(self, logs: List[Dict[str, Any]], search_pattern: str) -> List[str]: """Generate insights from log analysis.""" insights = [] if not logs: insights.append("No logs found matching the search criteria") return insights # Pattern-specific insights if search_pattern: insights.append(f"Found {len(logs)} logs matching pattern '{search_pattern}'") # Severity insights critical_logs = [log for log in logs if log.get("severity_score", 0) >= 90] if critical_logs: insights.append(f"Found {len(critical_logs)} critical severity logs requiring immediate attention") # Frequency insights if len(logs) > 100: insights.append("High log volume detected - consider investigating potential issues") # Component insights component_counts = Counter(log.get("component", "unknown") for log in logs) most_active = component_counts.most_common(1)[0] if component_counts else None if most_active: insights.append(f"Most active component: {most_active[0]} ({most_active[1]} logs)") # Category insights category_counts = Counter(log.get("category", "general") for log in logs) if "authentication" in category_counts and category_counts["authentication"] > 10: insights.append("High authentication activity detected") return insights # Helper methods for error log analysis async def _get_manager_error_logs(self, severity_filter: List[str], time_range_hours: int, max_errors: int) -> List[Dict[str, Any]]: """Get manager error logs with filtering.""" try: end_time = datetime.utcnow() start_time = end_time - timedelta(hours=time_range_hours) error_response = await self.api_client.get_manager_logs( levels=severity_filter, start_time=start_time.isoformat(), end_time=end_time.isoformat(), limit=max_errors ) return error_response.get("data", {}).get("logs", []) except Exception as e: self.logger.error(f"Error getting manager error logs: {str(e)}") # Return mock error data return self._generate_mock_error_data(severity_filter, max_errors) def _generate_mock_error_data(self, severity_filter: List[str], max_errors: int) -> List[Dict[str, Any]]: """Generate mock error data for demonstration.""" mock_errors = [] base_time = datetime.utcnow() error_templates = [ "Failed to connect to agent {agent_id}: Connection timeout", "Database connection lost: Unable to execute query", "Authentication failed for user {user}: Invalid credentials", "Cluster synchronization error: Node {node} unreachable", "Rule processing error: Invalid regex pattern in rule {rule_id}", "Memory allocation failed: Out of memory", "Configuration validation error: Invalid parameter {param}", "Network interface error: Cannot bind to port {port}" ] for i in range(min(max_errors, 50)): error_time = base_time - timedelta(minutes=i * 3) level = severity_filter[i % len(severity_filter)] if severity_filter else "error" template = error_templates[i % len(error_templates)] message = template.format( agent_id=f"00{i % 10}", user=f"user{i % 5}", node=f"node{i % 3}", rule_id=f"rule_{1000 + i}", param=f"param_{i}", port=8000 + (i % 100) ) mock_errors.append({ "timestamp": error_time.isoformat() + "Z", "level": level, "message": message, "component": "wazuh-manager", "error_code": f"E{1000 + i}", "thread_id": f"thread_{i % 5}" }) return mock_errors def _group_similar_errors(self, errors: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: """Group similar errors together.""" groups = defaultdict(list) for error in errors: message = error.get("message", "") # Normalize message for grouping normalized = re.sub(r'\d+', 'X', message) # Replace numbers normalized = re.sub(r'[a-f0-9]{8,}', 'HASH', normalized) # Replace hashes normalized = re.sub(r'user\w+', 'USER', normalized) # Replace usernames normalized = re.sub(r'node\w+', 'NODE', normalized) # Replace node names groups[normalized].append(error) return dict(groups) def _analyze_error_groups(self, grouped_errors: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]: """Analyze grouped error data.""" group_analysis = {} for pattern, errors in grouped_errors.items(): group_analysis[pattern] = { "count": len(errors), "first_occurrence": min(error.get("timestamp", "") for error in errors), "last_occurrence": max(error.get("timestamp", "") for error in errors), "frequency_per_hour": len(errors) / 24, # Assuming 24 hour window "severity_levels": list(set(error.get("level", "unknown") for error in errors)), "affected_components": list(set(error.get("component", "unknown") for error in errors)), "sample_message": errors[0].get("message", "") if errors else "" } return group_analysis def _identify_critical_errors(self, errors: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Identify critical errors requiring immediate attention.""" critical_keywords = [ "out of memory", "segmentation fault", "core dump", "fatal", "cannot start", "shutdown", "panic", "corruption" ] critical_errors = [] for error in errors: message = error.get("message", "").lower() level = error.get("level", "").lower() is_critical = ( level in ["critical", "fatal"] or any(keyword in message for keyword in critical_keywords) ) if is_critical: critical_errors.append({ **error, "criticality_reason": self._determine_criticality_reason(error, critical_keywords) }) return critical_errors def _determine_criticality_reason(self, error: Dict[str, Any], keywords: List[str]) -> str: """Determine why an error is considered critical.""" message = error.get("message", "").lower() level = error.get("level", "").lower() if level in ["critical", "fatal"]: return f"Severity level: {level}" for keyword in keywords: if keyword in message: return f"Contains critical keyword: {keyword}" return "Unknown criticality reason" def _analyze_error_patterns(self, errors: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze patterns in error occurrences.""" patterns = { "temporal_patterns": self._analyze_temporal_error_patterns(errors), "component_patterns": Counter(error.get("component", "unknown") for error in errors), "error_cascades": self._detect_error_cascades(errors) } return { "temporal_patterns": patterns["temporal_patterns"], "component_distribution": dict(patterns["component_patterns"]), "error_cascades": patterns["error_cascades"], "most_problematic_component": patterns["component_patterns"].most_common(1)[0][0] if patterns["component_patterns"] else None } def _analyze_temporal_error_patterns(self, errors: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze when errors occur most frequently.""" hourly_counts = Counter() for error in errors: timestamp = error.get("timestamp", "") try: dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) hour_key = dt.strftime("%H:00") hourly_counts[hour_key] += 1 except: continue return { "hourly_distribution": dict(hourly_counts), "peak_error_hour": hourly_counts.most_common(1)[0][0] if hourly_counts else None, "error_frequency_trend": "increasing" if self._is_increasing_trend(hourly_counts) else "stable" } def _is_increasing_trend(self, hourly_counts: Counter) -> bool: """Determine if error frequency is increasing.""" if len(hourly_counts) < 4: return False # Simple trend detection: compare first and last quarters sorted_hours = sorted(hourly_counts.items()) first_quarter = sum(count for _, count in sorted_hours[:len(sorted_hours)//4]) last_quarter = sum(count for _, count in sorted_hours[-len(sorted_hours)//4:]) return last_quarter > first_quarter * 1.2 def _detect_error_cascades(self, errors: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Detect error cascades (related errors occurring in sequence).""" # Sort errors by timestamp sorted_errors = sorted(errors, key=lambda x: x.get("timestamp", "")) cascades = [] current_cascade = [] last_time = None for error in sorted_errors: try: error_time = datetime.fromisoformat(error.get("timestamp", "").replace("Z", "+00:00")) # Start new cascade if gap > 10 minutes if last_time and (error_time - last_time).total_seconds() > 600: if len(current_cascade) > 2: # Only report cascades with 3+ errors cascades.append({ "start_time": current_cascade[0].get("timestamp"), "end_time": current_cascade[-1].get("timestamp"), "error_count": len(current_cascade), "duration_minutes": ((datetime.fromisoformat(current_cascade[-1].get("timestamp", "").replace("Z", "+00:00")) - datetime.fromisoformat(current_cascade[0].get("timestamp", "").replace("Z", "+00:00"))).total_seconds() / 60), "primary_components": list(set(e.get("component", "unknown") for e in current_cascade)) }) current_cascade = [] current_cascade.append(error) last_time = error_time except: continue # Check final cascade if len(current_cascade) > 2: cascades.append({ "start_time": current_cascade[0].get("timestamp"), "end_time": current_cascade[-1].get("timestamp"), "error_count": len(current_cascade), "primary_components": list(set(e.get("component", "unknown") for e in current_cascade)) }) return cascades def _perform_root_cause_analysis(self, grouped_errors: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]: """Perform automated root cause analysis on error groups.""" root_causes = {} for pattern, errors in grouped_errors.items(): if len(errors) < 2: # Skip single occurrences continue analysis = { "potential_causes": [], "recommended_actions": [], "confidence_score": 0 } # Analyze error pattern for root causes sample_message = errors[0].get("message", "").lower() if "connection" in sample_message and "timeout" in sample_message: analysis["potential_causes"].append("Network connectivity issues") analysis["recommended_actions"].append("Check network configuration and firewall rules") analysis["confidence_score"] = 85 elif "authentication" in sample_message and "failed" in sample_message: analysis["potential_causes"].append("Invalid credentials or authentication service issues") analysis["recommended_actions"].append("Verify user credentials and authentication service status") analysis["confidence_score"] = 90 elif "memory" in sample_message: analysis["potential_causes"].append("Insufficient system memory or memory leak") analysis["recommended_actions"].append("Monitor memory usage and consider increasing available memory") analysis["confidence_score"] = 80 elif "database" in sample_message: analysis["potential_causes"].append("Database connectivity or performance issues") analysis["recommended_actions"].append("Check database status and performance metrics") analysis["confidence_score"] = 75 else: analysis["potential_causes"].append("Configuration or system-level issue") analysis["recommended_actions"].append("Review system logs and configuration files") analysis["confidence_score"] = 50 # Add frequency-based insights if len(errors) > 10: analysis["potential_causes"].append("Recurring system issue requiring immediate attention") analysis["confidence_score"] = min(100, analysis["confidence_score"] + 10) root_causes[pattern] = analysis return root_causes def _analyze_error_trends(self, errors: List[Dict[str, Any]], time_range_hours: int) -> Dict[str, Any]: """Analyze error trends over time.""" # Group errors by time buckets bucket_size_minutes = max(60, time_range_hours * 60 // 24) # At least 1 hour buckets time_buckets = defaultdict(int) for error in errors: try: error_time = datetime.fromisoformat(error.get("timestamp", "").replace("Z", "+00:00")) # Round down to bucket boundary bucket_time = error_time.replace( minute=error_time.minute - error_time.minute % bucket_size_minutes, second=0, microsecond=0 ) time_buckets[bucket_time] += 1 except: continue # Calculate trend if len(time_buckets) >= 2: sorted_buckets = sorted(time_buckets.items()) error_counts = [count for _, count in sorted_buckets] # Simple linear trend if error_counts[-1] > error_counts[0]: trend_direction = "increasing" elif error_counts[-1] < error_counts[0]: trend_direction = "decreasing" else: trend_direction = "stable" else: trend_direction = "insufficient_data" return { "trend_direction": trend_direction, "total_time_buckets": len(time_buckets), "average_errors_per_bucket": sum(time_buckets.values()) / len(time_buckets) if time_buckets else 0, "peak_error_count": max(time_buckets.values()) if time_buckets else 0, "error_distribution": [ {"time": time.isoformat(), "count": count} for time, count in sorted(time_buckets.items()) ] } def _generate_error_remediation(self, analysis: Dict[str, Any]) -> Dict[str, Any]: """Generate remediation recommendations based on error analysis.""" remediation = { "immediate_actions": [], "short_term_actions": [], "long_term_actions": [], "preventive_measures": [] } overview = analysis.get("overview", {}) critical_errors = analysis.get("critical_errors", []) root_cause = analysis.get("root_cause_analysis", {}) # Immediate actions for critical errors if critical_errors: remediation["immediate_actions"].append({ "priority": "critical", "action": f"Address {len(critical_errors)} critical errors immediately", "timeline": "within 1 hour" }) # Actions based on root cause analysis for pattern, cause_analysis in root_cause.items(): if cause_analysis.get("confidence_score", 0) > 80: for action in cause_analysis.get("recommended_actions", []): remediation["short_term_actions"].append({ "action": action, "pattern": pattern[:50] + "..." if len(pattern) > 50 else pattern, "confidence": cause_analysis.get("confidence_score") }) # Long-term actions based on trends trends = analysis.get("trends", {}) if trends.get("trend_direction") == "increasing": remediation["long_term_actions"].append("Implement proactive monitoring to prevent error escalation") # Preventive measures remediation["preventive_measures"] = [ "Implement automated error detection and alerting", "Regular system health checks and maintenance", "Monitor system resources and performance metrics", "Maintain up-to-date system documentation" ] return remediation

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gensecaihq/Wazuh-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server