Skip to main content
Glama

Wazuh MCP Server

by gensecaihq
agents.py43.3 kB
"""Agent monitoring and management tools for Wazuh MCP Server.""" from typing import Any, Dict, List import mcp.types as types from datetime import datetime, timedelta from collections import defaultdict, Counter from .base import BaseTool from ..utils import validate_running_agents_query, validate_rules_summary_query class AgentTools(BaseTool): """Tools for Wazuh agent monitoring, health checking, and management.""" @property def tool_definitions(self) -> List[types.Tool]: """Return agent-related tool definitions.""" return [ types.Tool( name="get_wazuh_running_agents", description="Get comprehensive real-time agent health monitoring with performance metrics", inputSchema={ "type": "object", "properties": { "include_performance": { "type": "boolean", "description": "Include detailed performance metrics", "default": True }, "health_threshold": { "type": "number", "description": "Health score threshold (0-100) for filtering", "default": 0, "minimum": 0, "maximum": 100 }, "include_inactive": { "type": "boolean", "description": "Include inactive/disconnected agents", "default": False }, "group_filter": { "type": "array", "items": {"type": "string"}, "description": "Filter by agent groups (optional)" }, "os_filter": { "type": "array", "items": {"type": "string"}, "description": "Filter by operating system (optional)" } } } ), types.Tool( name="get_wazuh_rules_summary", description="Get comprehensive rule effectiveness and coverage analysis with performance insights", inputSchema={ "type": "object", "properties": { "include_effectiveness": { "type": "boolean", "description": "Include rule effectiveness analysis", "default": True }, "time_range_hours": { "type": "integer", "description": "Time range in hours for analysis", "default": 24, "minimum": 1, "maximum": 168 }, "rule_level_filter": { "type": "array", "items": {"type": "integer"}, "description": "Filter by rule levels (1-15)", "default": [] }, "include_coverage_gaps": { "type": "boolean", "description": "Analyze coverage gaps and recommendations", "default": True }, "group_by": { "type": "string", "description": "Group analysis by category", "enum": ["level", "category", "source", "compliance"], "default": "category" } } } ) ] def get_handler_mapping(self) -> Dict[str, callable]: """Return mapping of tool names to handler methods.""" return { "get_wazuh_running_agents": self.handle_running_agents, "get_wazuh_rules_summary": self.handle_rules_summary } async def handle_running_agents(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle running agents monitoring request.""" try: # Validate input validated_args = validate_running_agents_query(arguments) include_performance = validated_args.get("include_performance", True) health_threshold = validated_args.get("health_threshold", 0) include_inactive = validated_args.get("include_inactive", False) group_filter = validated_args.get("group_filter") os_filter = validated_args.get("os_filter") # Get agent data agents_data = await self._get_comprehensive_agent_data( include_inactive, group_filter, os_filter ) # Calculate health scores and filter agents_with_health = [] for agent in agents_data: health_score = self._calculate_agent_health_score(agent) agent["health_score"] = health_score agent["health_status"] = self._get_health_status(health_score) if health_score >= health_threshold: agents_with_health.append(agent) # Generate comprehensive analysis analysis = { "overview": self._generate_agents_overview(agents_with_health), "health_distribution": self._analyze_health_distribution(agents_with_health), "connectivity_status": self._analyze_connectivity(agents_with_health), "os_distribution": self._analyze_os_distribution(agents_with_health), "group_analysis": self._analyze_agent_groups(agents_with_health), "alerts": self._generate_health_alerts(agents_with_health) } if include_performance: analysis["performance_metrics"] = await self._get_performance_metrics(agents_with_health) # Add recommendations analysis["recommendations"] = self._generate_agent_recommendations(analysis) return self._format_response(analysis, metadata={ "source": "wazuh_agents", "analysis_type": "comprehensive_agent_monitoring", "filters_applied": { "health_threshold": health_threshold, "include_inactive": include_inactive, "groups": group_filter or "all", "os_types": os_filter or "all" } }) except Exception as e: self.logger.error(f"Error in running agents analysis: {str(e)}") return self._format_error_response(e, {"operation": "get_wazuh_running_agents"}) async def handle_rules_summary(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle rules summary and effectiveness analysis.""" try: # Validate input validated_args = validate_rules_summary_query(arguments) include_effectiveness = validated_args.get("include_effectiveness", True) time_range_hours = validated_args.get("time_range_hours", 24) rule_level_filter = validated_args.get("rule_level_filter", []) include_coverage_gaps = validated_args.get("include_coverage_gaps", True) group_by = validated_args.get("group_by", "category") # Get rules and alert data rules_data = await self._get_comprehensive_rules_data(rule_level_filter) if include_effectiveness: effectiveness_data = await self._analyze_rule_effectiveness( rules_data, time_range_hours ) else: effectiveness_data = {} # Generate comprehensive analysis analysis = { "overview": self._generate_rules_overview(rules_data), "distribution": self._analyze_rules_distribution(rules_data, group_by), "compliance_mapping": self._analyze_compliance_coverage(rules_data), "performance_analysis": self._analyze_rule_performance(rules_data) } if include_effectiveness: analysis["effectiveness"] = effectiveness_data if include_coverage_gaps: analysis["coverage_gaps"] = await self._analyze_coverage_gaps(rules_data) # Add actionable recommendations analysis["recommendations"] = self._generate_rules_recommendations(analysis) return self._format_response(analysis, metadata={ "source": "wazuh_rules", "analysis_type": "comprehensive_rules_analysis", "time_range_hours": time_range_hours, "group_by": group_by }) except Exception as e: self.logger.error(f"Error in rules summary analysis: {str(e)}") return self._format_error_response(e, {"operation": "get_wazuh_rules_summary"}) # Helper methods for agent monitoring async def _get_comprehensive_agent_data(self, include_inactive: bool, group_filter: List[str] = None, os_filter: List[str] = None) -> List[Dict[str, Any]]: """Get comprehensive agent data from Wazuh.""" # Get basic agent information agents_response = await self.api_client.get_agents( status="all" if include_inactive else "active" ) agents = agents_response.get("data", {}).get("affected_items", []) # Apply filters filtered_agents = [] for agent in agents: # Group filter if group_filter: agent_groups = agent.get("group", []) if not any(group in agent_groups for group in group_filter): continue # OS filter if os_filter: agent_os = agent.get("os", {}).get("platform", "").lower() if not any(os_type.lower() in agent_os for os_type in os_filter): continue # Enrich agent data enriched_agent = await self._enrich_agent_data(agent) filtered_agents.append(enriched_agent) return filtered_agents async def _enrich_agent_data(self, agent: Dict[str, Any]) -> Dict[str, Any]: """Enrich agent data with additional metrics.""" agent_id = agent.get("id") # Get additional agent information try: # Agent stats stats_response = await self.api_client.get_agent_stats(agent_id) agent_stats = stats_response.get("data", {}) # Recent alerts for this agent alerts_response = await self.api_client.get_alerts( agent_id=agent_id, limit=100, time_range=3600 # Last hour ) recent_alerts = alerts_response.get("data", {}).get("affected_items", []) return { **agent, "stats": agent_stats, "recent_alerts_count": len(recent_alerts), "recent_alerts": recent_alerts[:5], # Last 5 alerts "last_alert_time": recent_alerts[0].get("timestamp") if recent_alerts else None, "enrichment_timestamp": datetime.utcnow().isoformat() } except Exception as e: self.logger.warning(f"Could not enrich agent {agent_id}: {str(e)}") return { **agent, "stats": {}, "recent_alerts_count": 0, "recent_alerts": [], "enrichment_error": str(e) } def _calculate_agent_health_score(self, agent: Dict[str, Any]) -> int: """Calculate comprehensive health score for agent (0-100).""" score = 0 # Connection status (40 points) status = agent.get("status", "").lower() if status == "active": score += 40 elif status == "pending": score += 20 elif status == "never_connected": score += 0 else: # disconnected score += 10 # Last keep alive (20 points) last_keep_alive = agent.get("lastKeepAlive") if last_keep_alive: try: last_time = datetime.fromisoformat(last_keep_alive.replace("Z", "+00:00")) minutes_ago = (datetime.utcnow() - last_time.replace(tzinfo=None)).total_seconds() / 60 if minutes_ago <= 5: score += 20 elif minutes_ago <= 15: score += 15 elif minutes_ago <= 60: score += 10 else: score += 0 except: score += 5 # Partial credit for having timestamp # Recent alerts activity (20 points) alerts_count = agent.get("recent_alerts_count", 0) if alerts_count > 0: # Active monitoring is good, but not too many alerts if alerts_count <= 10: score += 20 elif alerts_count <= 50: score += 15 else: score += 10 # Too many alerts might indicate issues else: score += 10 # No alerts could be good or bad # Agent configuration (10 points) if agent.get("group"): score += 5 if agent.get("version"): score += 5 # OS information completeness (10 points) os_info = agent.get("os", {}) if os_info.get("platform"): score += 3 if os_info.get("version"): score += 3 if os_info.get("name"): score += 4 return min(score, 100) def _get_health_status(self, health_score: int) -> str: """Get health status label from score.""" if health_score >= 80: return "excellent" elif health_score >= 60: return "good" elif health_score >= 40: return "fair" elif health_score >= 20: return "poor" else: return "critical" def _generate_agents_overview(self, agents: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate overview statistics for agents.""" if not agents: return {"total": 0, "message": "No agents found"} status_counts = Counter(agent.get("status", "unknown") for agent in agents) health_scores = [agent.get("health_score", 0) for agent in agents] os_counts = Counter( agent.get("os", {}).get("platform", "unknown") for agent in agents ) return { "total_agents": len(agents), "status_distribution": dict(status_counts), "health_metrics": { "average_health_score": sum(health_scores) / len(health_scores), "healthy_agents": sum(1 for score in health_scores if score >= 80), "agents_needing_attention": sum(1 for score in health_scores if score < 60) }, "os_distribution": dict(os_counts.most_common()), "recent_activity": { "agents_with_recent_alerts": sum( 1 for agent in agents if agent.get("recent_alerts_count", 0) > 0 ), "total_recent_alerts": sum( agent.get("recent_alerts_count", 0) for agent in agents ) } } def _analyze_health_distribution(self, agents: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze distribution of agent health scores.""" health_statuses = Counter(agent.get("health_status", "unknown") for agent in agents) # Group agents by health score ranges score_ranges = { "90-100": 0, "80-89": 0, "70-79": 0, "60-69": 0, "50-59": 0, "0-49": 0 } for agent in agents: score = agent.get("health_score", 0) if score >= 90: score_ranges["90-100"] += 1 elif score >= 80: score_ranges["80-89"] += 1 elif score >= 70: score_ranges["70-79"] += 1 elif score >= 60: score_ranges["60-69"] += 1 elif score >= 50: score_ranges["50-59"] += 1 else: score_ranges["0-49"] += 1 return { "by_status": dict(health_statuses), "by_score_range": score_ranges, "critical_agents": [ { "id": agent.get("id"), "name": agent.get("name"), "health_score": agent.get("health_score"), "status": agent.get("status"), "issues": self._identify_agent_issues(agent) } for agent in agents if agent.get("health_score", 0) < 40 ] } def _analyze_connectivity(self, agents: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze agent connectivity patterns.""" connectivity_issues = [] last_contact_analysis = {"within_hour": 0, "within_day": 0, "older": 0} for agent in agents: last_keep_alive = agent.get("lastKeepAlive") if last_keep_alive: try: last_time = datetime.fromisoformat(last_keep_alive.replace("Z", "+00:00")) hours_ago = (datetime.utcnow() - last_time.replace(tzinfo=None)).total_seconds() / 3600 if hours_ago <= 1: last_contact_analysis["within_hour"] += 1 elif hours_ago <= 24: last_contact_analysis["within_day"] += 1 else: last_contact_analysis["older"] += 1 # Flag connectivity issues if hours_ago > 24 and agent.get("status") == "active": connectivity_issues.append({ "agent_id": agent.get("id"), "agent_name": agent.get("name"), "last_contact": last_keep_alive, "hours_since_contact": round(hours_ago, 2) }) except: last_contact_analysis["older"] += 1 return { "last_contact_distribution": last_contact_analysis, "connectivity_issues": connectivity_issues, "connectivity_health": "good" if len(connectivity_issues) == 0 else "needs_attention" } def _analyze_os_distribution(self, agents: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze operating system distribution.""" os_details = defaultdict(lambda: {"count": 0, "versions": set(), "agents": []}) for agent in agents: os_info = agent.get("os", {}) platform = os_info.get("platform", "unknown") version = os_info.get("version", "unknown") os_details[platform]["count"] += 1 os_details[platform]["versions"].add(version) os_details[platform]["agents"].append({ "id": agent.get("id"), "name": agent.get("name"), "version": version }) # Convert sets to lists for JSON serialization for platform_data in os_details.values(): platform_data["versions"] = list(platform_data["versions"]) platform_data["unique_versions"] = len(platform_data["versions"]) return { "platforms": dict(os_details), "diversity_score": len(os_details), "version_fragmentation": { platform: data["unique_versions"] for platform, data in os_details.items() } } def _analyze_agent_groups(self, agents: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze agent group distribution and health.""" group_analysis = defaultdict(lambda: { "agents": [], "health_scores": [], "status_distribution": Counter() }) for agent in agents: groups = agent.get("group", ["default"]) for group in groups: group_data = group_analysis[group] group_data["agents"].append(agent.get("id")) group_data["health_scores"].append(agent.get("health_score", 0)) group_data["status_distribution"][agent.get("status", "unknown")] += 1 # Calculate group health metrics group_health = {} for group, data in group_analysis.items(): health_scores = data["health_scores"] group_health[group] = { "agent_count": len(data["agents"]), "average_health": sum(health_scores) / len(health_scores) if health_scores else 0, "status_distribution": dict(data["status_distribution"]), "health_rating": self._get_health_status( sum(health_scores) / len(health_scores) if health_scores else 0 ) } return { "groups": group_health, "total_groups": len(group_analysis), "healthiest_group": max(group_health.items(), key=lambda x: x[1]["average_health"])[0] if group_health else None, "groups_needing_attention": [ group for group, data in group_health.items() if data["average_health"] < 60 ] } def _generate_health_alerts(self, agents: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Generate health alerts for agents.""" alerts = [] # Critical health agents critical_agents = [a for a in agents if a.get("health_score", 0) < 40] if critical_agents: alerts.append({ "level": "critical", "type": "poor_health", "message": f"{len(critical_agents)} agents have critical health scores", "affected_agents": [a.get("id") for a in critical_agents], "recommendation": "Investigate connectivity and configuration issues" }) # Disconnected agents disconnected = [a for a in agents if a.get("status") == "disconnected"] if disconnected: alerts.append({ "level": "warning", "type": "disconnected", "message": f"{len(disconnected)} agents are disconnected", "affected_agents": [a.get("id") for a in disconnected], "recommendation": "Check network connectivity and agent services" }) # High alert volume high_alert_agents = [a for a in agents if a.get("recent_alerts_count", 0) > 50] if high_alert_agents: alerts.append({ "level": "warning", "type": "high_alert_volume", "message": f"{len(high_alert_agents)} agents have unusually high alert volume", "affected_agents": [a.get("id") for a in high_alert_agents], "recommendation": "Review alert configuration and investigate potential issues" }) return alerts async def _get_performance_metrics(self, agents: List[Dict[str, Any]]) -> Dict[str, Any]: """Get performance metrics for agents.""" performance_data = { "response_times": [], "memory_usage": [], "cpu_usage": [], "network_metrics": [] } for agent in agents: stats = agent.get("stats", {}) # Mock performance data (would come from actual agent stats) if stats: performance_data["response_times"].append({ "agent_id": agent.get("id"), "response_time_ms": stats.get("response_time", 100) }) performance_data["memory_usage"].append({ "agent_id": agent.get("id"), "memory_mb": stats.get("memory_usage", 50) }) return { "summary": { "total_agents_with_metrics": len([a for a in agents if a.get("stats")]), "average_response_time": sum( m["response_time_ms"] for m in performance_data["response_times"] ) / len(performance_data["response_times"]) if performance_data["response_times"] else 0 }, "details": performance_data } def _generate_agent_recommendations(self, analysis: Dict[str, Any]) -> List[str]: """Generate recommendations based on agent analysis.""" recommendations = [] overview = analysis.get("overview", {}) health_metrics = overview.get("health_metrics", {}) # Health-based recommendations if health_metrics.get("agents_needing_attention", 0) > 0: recommendations.append( f"Address {health_metrics['agents_needing_attention']} agents with poor health scores" ) # Connectivity recommendations connectivity = analysis.get("connectivity_status", {}) if connectivity.get("connectivity_issues", []): recommendations.append("Investigate connectivity issues with flagged agents") # Group recommendations group_analysis = analysis.get("group_analysis", {}) problematic_groups = group_analysis.get("groups_needing_attention", []) if problematic_groups: recommendations.append(f"Review configuration for groups: {', '.join(problematic_groups)}") # OS diversity recommendations os_dist = analysis.get("os_distribution", {}) if os_dist.get("diversity_score", 0) > 5: recommendations.append("Consider standardizing OS versions for easier management") return recommendations # Helper methods for rules analysis async def _get_comprehensive_rules_data(self, level_filter: List[int] = None) -> List[Dict[str, Any]]: """Get comprehensive rules data from Wazuh.""" # Get rules information rules_response = await self.api_client.get_rules() rules = rules_response.get("data", {}).get("affected_items", []) # Apply level filter if level_filter: rules = [rule for rule in rules if rule.get("level", 0) in level_filter] # Enrich rules with additional metadata enriched_rules = [] for rule in rules: enriched_rule = { **rule, "effectiveness_score": self._calculate_rule_effectiveness(rule), "compliance_mappings": self._get_rule_compliance_mappings(rule), "category": self._categorize_rule(rule) } enriched_rules.append(enriched_rule) return enriched_rules async def _analyze_rule_effectiveness(self, rules: List[Dict[str, Any]], time_range_hours: int) -> Dict[str, Any]: """Analyze rule effectiveness based on alert patterns.""" effectiveness_data = { "active_rules": [], "inactive_rules": [], "top_performing_rules": [], "underperforming_rules": [] } # Get alerts for the time range end_time = datetime.utcnow() start_time = end_time - timedelta(hours=time_range_hours) alerts_response = await self.api_client.get_alerts( limit=10000, time_range=time_range_hours * 3600 ) alerts = alerts_response.get("data", {}).get("affected_items", []) # Analyze rule usage rule_usage = Counter(alert.get("rule", {}).get("id") for alert in alerts) for rule in rules: rule_id = rule.get("id") alert_count = rule_usage.get(rule_id, 0) rule_analysis = { "rule_id": rule_id, "description": rule.get("description", ""), "level": rule.get("level", 0), "alerts_generated": alert_count, "effectiveness_score": rule.get("effectiveness_score", 0) } if alert_count > 0: effectiveness_data["active_rules"].append(rule_analysis) if alert_count > 10: # High activity effectiveness_data["top_performing_rules"].append(rule_analysis) else: effectiveness_data["inactive_rules"].append(rule_analysis) if rule.get("level", 0) >= 10: # High level but no alerts effectiveness_data["underperforming_rules"].append(rule_analysis) return effectiveness_data def _calculate_rule_effectiveness(self, rule: Dict[str, Any]) -> int: """Calculate effectiveness score for a rule (0-100).""" score = 0 # Level scoring (0-30 points) level = rule.get("level", 0) score += min(level * 2, 30) # Groups and categories (0-20 points) groups = rule.get("groups", []) if groups: score += 10 # Bonus for important categories important_groups = ["authentication", "firewall", "intrusion_detection", "malware"] if any(group in important_groups for group in groups): score += 10 # Description quality (0-20 points) description = rule.get("description", "") if description: score += 10 if len(description) > 50: # Detailed description score += 10 # Compliance mappings (0-15 points) if rule.get("pci_dss"): score += 5 if rule.get("hipaa"): score += 5 if rule.get("gdpr"): score += 5 # Rule complexity (0-15 points) if rule.get("regex"): score += 5 if rule.get("decoded_as"): score += 5 if rule.get("if_sid"): score += 5 return min(score, 100) def _get_rule_compliance_mappings(self, rule: Dict[str, Any]) -> List[str]: """Get compliance framework mappings for rule.""" mappings = [] if rule.get("pci_dss"): mappings.append("PCI_DSS") if rule.get("hipaa"): mappings.append("HIPAA") if rule.get("gdpr"): mappings.append("GDPR") if rule.get("tsc"): mappings.append("TSC") if rule.get("nist_800_53"): mappings.append("NIST_800_53") return mappings def _categorize_rule(self, rule: Dict[str, Any]) -> str: """Categorize rule based on groups and description.""" groups = rule.get("groups", []) description = rule.get("description", "").lower() # Category mapping if any("authentication" in group for group in groups) or "login" in description: return "authentication" elif any("firewall" in group for group in groups) or "firewall" in description: return "network_security" elif any("intrusion" in group for group in groups) or "intrusion" in description: return "intrusion_detection" elif any("malware" in group for group in groups) or "malware" in description: return "malware_detection" elif any("compliance" in group for group in groups): return "compliance" elif any("web" in group for group in groups) or "web" in description: return "web_security" elif any("system" in group for group in groups) or "system" in description: return "system_monitoring" else: return "general" def _generate_rules_overview(self, rules: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate overview statistics for rules.""" if not rules: return {"total": 0, "message": "No rules found"} level_dist = Counter(rule.get("level", 0) for rule in rules) category_dist = Counter(rule.get("category", "unknown") for rule in rules) effectiveness_scores = [rule.get("effectiveness_score", 0) for rule in rules] return { "total_rules": len(rules), "level_distribution": dict(level_dist), "category_distribution": dict(category_dist), "effectiveness_metrics": { "average_effectiveness": sum(effectiveness_scores) / len(effectiveness_scores), "high_effectiveness_rules": sum(1 for score in effectiveness_scores if score >= 80), "low_effectiveness_rules": sum(1 for score in effectiveness_scores if score < 40) }, "compliance_coverage": { framework: sum(1 for rule in rules if framework in rule.get("compliance_mappings", [])) for framework in ["PCI_DSS", "HIPAA", "GDPR", "NIST_800_53"] } } def _analyze_rules_distribution(self, rules: List[Dict[str, Any]], group_by: str) -> Dict[str, Any]: """Analyze rules distribution by specified grouping.""" distribution = defaultdict(list) for rule in rules: if group_by == "level": key = rule.get("level", 0) elif group_by == "category": key = rule.get("category", "unknown") elif group_by == "source": key = rule.get("filename", "unknown") elif group_by == "compliance": mappings = rule.get("compliance_mappings", []) key = mappings[0] if mappings else "none" else: key = "all" distribution[key].append(rule) # Generate statistics for each group group_stats = {} for key, group_rules in distribution.items(): effectiveness_scores = [r.get("effectiveness_score", 0) for r in group_rules] group_stats[str(key)] = { "count": len(group_rules), "average_effectiveness": sum(effectiveness_scores) / len(effectiveness_scores) if effectiveness_scores else 0, "level_range": { "min": min(r.get("level", 0) for r in group_rules), "max": max(r.get("level", 0) for r in group_rules) } if group_rules else {"min": 0, "max": 0} } return { "grouped_by": group_by, "groups": group_stats, "total_groups": len(distribution) } def _analyze_compliance_coverage(self, rules: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze compliance framework coverage.""" frameworks = ["PCI_DSS", "HIPAA", "GDPR", "NIST_800_53", "TSC"] coverage = {} for framework in frameworks: framework_rules = [ rule for rule in rules if framework in rule.get("compliance_mappings", []) ] coverage[framework] = { "total_rules": len(framework_rules), "coverage_percentage": (len(framework_rules) / len(rules)) * 100 if rules else 0, "high_level_rules": len([r for r in framework_rules if r.get("level", 0) >= 10]), "categories_covered": len(set(r.get("category") for r in framework_rules)) } return coverage def _analyze_rule_performance(self, rules: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze rule performance characteristics.""" # Group rules by performance characteristics high_performance = [r for r in rules if r.get("effectiveness_score", 0) >= 80] medium_performance = [r for r in rules if 40 <= r.get("effectiveness_score", 0) < 80] low_performance = [r for r in rules if r.get("effectiveness_score", 0) < 40] return { "performance_tiers": { "high": { "count": len(high_performance), "percentage": (len(high_performance) / len(rules)) * 100 if rules else 0 }, "medium": { "count": len(medium_performance), "percentage": (len(medium_performance) / len(rules)) * 100 if rules else 0 }, "low": { "count": len(low_performance), "percentage": (len(low_performance) / len(rules)) * 100 if rules else 0 } }, "optimization_candidates": [ { "rule_id": rule.get("id"), "description": rule.get("description", ""), "current_score": rule.get("effectiveness_score", 0), "level": rule.get("level", 0) } for rule in low_performance[:10] # Top 10 candidates for optimization ] } async def _analyze_coverage_gaps(self, rules: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze potential coverage gaps in rule set.""" gaps = { "category_gaps": [], "compliance_gaps": [], "level_gaps": [], "recommendations": [] } # Analyze category coverage categories = set(rule.get("category") for rule in rules) expected_categories = { "authentication", "network_security", "intrusion_detection", "malware_detection", "web_security", "system_monitoring", "compliance" } missing_categories = expected_categories - categories if missing_categories: gaps["category_gaps"] = list(missing_categories) gaps["recommendations"].append( f"Consider adding rules for missing categories: {', '.join(missing_categories)}" ) # Analyze level distribution gaps level_dist = Counter(rule.get("level", 0) for rule in rules) if not any(level >= 12 for level in level_dist.keys()): gaps["level_gaps"].append("No critical level rules (12+)") gaps["recommendations"].append("Add critical level rules for high-priority threats") # Analyze compliance gaps frameworks = ["PCI_DSS", "HIPAA", "GDPR"] for framework in frameworks: framework_rules = [ r for r in rules if framework in r.get("compliance_mappings", []) ] coverage_percent = (len(framework_rules) / len(rules)) * 100 if rules else 0 if coverage_percent < 20: # Less than 20% coverage gaps["compliance_gaps"].append(f"Low {framework} coverage ({coverage_percent:.1f}%)") gaps["recommendations"].append(f"Increase {framework} compliance rule coverage") return gaps def _generate_rules_recommendations(self, analysis: Dict[str, Any]) -> List[str]: """Generate recommendations based on rules analysis.""" recommendations = [] overview = analysis.get("overview", {}) effectiveness = overview.get("effectiveness_metrics", {}) # Effectiveness recommendations if effectiveness.get("low_effectiveness_rules", 0) > 0: recommendations.append( f"Review and optimize {effectiveness['low_effectiveness_rules']} low-effectiveness rules" ) # Coverage recommendations coverage_gaps = analysis.get("coverage_gaps", {}) if coverage_gaps.get("category_gaps"): recommendations.append("Add rules for missing security categories") # Performance recommendations performance = analysis.get("performance_analysis", {}) optimization_candidates = performance.get("optimization_candidates", []) if optimization_candidates: recommendations.append(f"Optimize {len(optimization_candidates)} underperforming rules") # Compliance recommendations compliance = analysis.get("compliance_mapping", {}) low_coverage_frameworks = [ framework for framework, data in compliance.items() if data.get("coverage_percentage", 0) < 50 ] if low_coverage_frameworks: recommendations.append( f"Improve compliance coverage for: {', '.join(low_coverage_frameworks)}" ) return recommendations def _identify_agent_issues(self, agent: Dict[str, Any]) -> List[str]: """Identify specific issues with an agent.""" issues = [] # Status issues if agent.get("status") == "disconnected": issues.append("Agent disconnected") elif agent.get("status") == "never_connected": issues.append("Agent never connected") # Connectivity issues last_keep_alive = agent.get("lastKeepAlive") if last_keep_alive: try: last_time = datetime.fromisoformat(last_keep_alive.replace("Z", "+00:00")) hours_ago = (datetime.utcnow() - last_time.replace(tzinfo=None)).total_seconds() / 3600 if hours_ago > 24: issues.append(f"No contact for {int(hours_ago)} hours") except: issues.append("Invalid last keep alive timestamp") # Alert volume issues alert_count = agent.get("recent_alerts_count", 0) if alert_count > 100: issues.append("Unusually high alert volume") elif alert_count == 0: issues.append("No recent alerts (possible monitoring gap)") # Configuration issues if not agent.get("group"): issues.append("No group assignment") return issues

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gensecaihq/Wazuh-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server