Skip to main content
Glama

Wazuh MCP Server

by gensecaihq
vulnerabilities.py32.3 kB
"""Vulnerability analysis tools for Wazuh MCP Server.""" from typing import Any, Dict, List import mcp.types as types from datetime import datetime, timedelta from collections import defaultdict, Counter import re from .base import BaseTool from ..utils import validate_vulnerability_summary_query, validate_critical_vulnerabilities_query class VulnerabilityTools(BaseTool): """Tools for Wazuh vulnerability analysis and management.""" @property def tool_definitions(self) -> List[types.Tool]: """Return vulnerability-related tool definitions.""" return [ types.Tool( name="get_wazuh_vulnerability_summary", description="Get comprehensive vulnerability assessment across infrastructure with risk scoring", inputSchema={ "type": "object", "properties": { "severity_filter": { "type": "array", "items": {"type": "string"}, "description": "Filter by severity levels", "default": ["critical", "high", "medium", "low"] }, "include_remediation": { "type": "boolean", "description": "Include remediation recommendations", "default": True }, "agent_filter": { "type": "array", "items": {"type": "string"}, "description": "Filter by specific agent IDs (optional)" }, "package_filter": { "type": "string", "description": "Filter by package name pattern (optional)" } } } ), types.Tool( name="get_wazuh_critical_vulnerabilities", description="Get critical vulnerabilities with exploit intelligence and priority scoring", inputSchema={ "type": "object", "properties": { "include_exploit_data": { "type": "boolean", "description": "Include exploit availability and CVSS data", "default": True }, "priority_threshold": { "type": "number", "description": "Minimum priority score (0-100)", "default": 70, "minimum": 0, "maximum": 100 }, "time_range_days": { "type": "integer", "description": "Look for vulnerabilities discovered in last N days", "default": 30, "minimum": 1, "maximum": 365 }, "include_trending": { "type": "boolean", "description": "Include trending vulnerability analysis", "default": True } } } ) ] def get_handler_mapping(self) -> Dict[str, callable]: """Return mapping of tool names to handler methods.""" return { "get_wazuh_vulnerability_summary": self.handle_vulnerability_summary, "get_wazuh_critical_vulnerabilities": self.handle_critical_vulnerabilities } async def handle_vulnerability_summary(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle comprehensive vulnerability summary request.""" try: # Validate input validated_args = validate_vulnerability_summary_query(arguments) severity_filter = validated_args.get("severity_filter", ["critical", "high", "medium", "low"]) include_remediation = validated_args.get("include_remediation", True) agent_filter = validated_args.get("agent_filter") package_filter = validated_args.get("package_filter") # Get vulnerability data from multiple sources vuln_data = await self._get_comprehensive_vulnerability_data( severity_filter, agent_filter, package_filter ) # Generate comprehensive summary summary = { "overview": self._generate_vulnerability_overview(vuln_data), "risk_assessment": self._assess_infrastructure_risk(vuln_data), "distribution": self._analyze_vulnerability_distribution(vuln_data), "affected_systems": self._analyze_affected_systems(vuln_data), "package_analysis": self._analyze_vulnerable_packages(vuln_data), "timeline_analysis": self._analyze_vulnerability_timeline(vuln_data) } if include_remediation: summary["remediation"] = self._generate_remediation_recommendations(vuln_data) # Add compliance impact summary["compliance_impact"] = self._assess_compliance_impact(vuln_data) return self._format_response(summary, metadata={ "source": "wazuh_vulnerability_detector", "analysis_type": "comprehensive_vulnerability_summary", "filters_applied": { "severity": severity_filter, "agents": len(agent_filter) if agent_filter else "all", "package_pattern": package_filter or "all" } }) except Exception as e: self.logger.error(f"Error in vulnerability summary: {str(e)}") return self._format_error_response(e, {"operation": "get_wazuh_vulnerability_summary"}) async def handle_critical_vulnerabilities(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle critical vulnerabilities with exploit intelligence.""" try: # Validate input validated_args = validate_critical_vulnerabilities_query(arguments) include_exploit_data = validated_args.get("include_exploit_data", True) priority_threshold = validated_args.get("priority_threshold", 70) time_range_days = validated_args.get("time_range_days", 30) include_trending = validated_args.get("include_trending", True) # Get critical vulnerability data critical_vulns = await self._get_critical_vulnerabilities( priority_threshold, time_range_days ) # Enrich with exploit intelligence enriched_vulns = [] for vuln in critical_vulns: enriched = await self._enrich_with_exploit_data(vuln, include_exploit_data) enriched_vulns.append(enriched) # Sort by priority score enriched_vulns.sort(key=lambda x: x.get("priority_score", 0), reverse=True) # Generate analysis analysis = { "summary": { "total_critical": len(enriched_vulns), "with_exploits": sum(1 for v in enriched_vulns if v.get("exploit_available", False)), "recently_discovered": sum(1 for v in enriched_vulns if v.get("days_since_discovery", 0) <= 7), "average_priority": sum(v.get("priority_score", 0) for v in enriched_vulns) / len(enriched_vulns) if enriched_vulns else 0 }, "critical_vulnerabilities": enriched_vulns[:20], # Top 20 by priority "exploit_intelligence": self._analyze_exploit_landscape(enriched_vulns), "attack_vectors": self._analyze_attack_vectors(enriched_vulns), "immediate_actions": self._generate_immediate_actions(enriched_vulns) } if include_trending: analysis["trending_analysis"] = self._analyze_vulnerability_trends(enriched_vulns) return self._format_response(analysis, metadata={ "source": "wazuh_vulnerability_detector", "analysis_type": "critical_vulnerability_analysis", "priority_threshold": priority_threshold, "time_range_days": time_range_days }) except Exception as e: self.logger.error(f"Error in critical vulnerabilities analysis: {str(e)}") return self._format_error_response(e, {"operation": "get_wazuh_critical_vulnerabilities"}) # Helper methods for vulnerability analysis async def _get_comprehensive_vulnerability_data(self, severity_filter: List[str], agent_filter: List[str] = None, package_filter: str = None) -> List[Dict[str, Any]]: """Get comprehensive vulnerability data from Wazuh.""" vulnerabilities = [] # Get vulnerability data from vulnerability detector vuln_response = await self.api_client.get_vulnerabilities( severity=severity_filter, agent_ids=agent_filter ) raw_vulns = vuln_response.get("data", {}).get("affected_items", []) for vuln in raw_vulns: # Apply package filter if specified if package_filter: package_name = vuln.get("name", "") if not re.search(package_filter, package_name, re.IGNORECASE): continue # Enrich vulnerability data enriched_vuln = { **vuln, "risk_score": self._calculate_risk_score(vuln), "business_impact": self._assess_business_impact(vuln), "remediation_complexity": self._assess_remediation_complexity(vuln) } vulnerabilities.append(enriched_vuln) return vulnerabilities async def _get_critical_vulnerabilities(self, priority_threshold: float, time_range_days: int) -> List[Dict[str, Any]]: """Get critical vulnerabilities based on priority threshold.""" # Calculate date range end_date = datetime.utcnow() start_date = end_date - timedelta(days=time_range_days) # Get vulnerability data vuln_response = await self.api_client.get_vulnerabilities( severity=["critical", "high"], date_from=start_date.strftime("%Y-%m-%d"), date_to=end_date.strftime("%Y-%m-%d") ) vulnerabilities = vuln_response.get("data", {}).get("affected_items", []) # Filter by priority threshold and enrich critical_vulns = [] for vuln in vulnerabilities: priority_score = self._calculate_priority_score(vuln) if priority_score >= priority_threshold: vuln["priority_score"] = priority_score vuln["days_since_discovery"] = self._calculate_days_since_discovery(vuln) critical_vulns.append(vuln) return critical_vulns async def _enrich_with_exploit_data(self, vuln: Dict[str, Any], include_exploit_data: bool) -> Dict[str, Any]: """Enrich vulnerability with exploit intelligence.""" if not include_exploit_data: return vuln cve_id = vuln.get("cve") if not cve_id: return vuln # Mock exploit data (in real implementation, this would query external APIs) exploit_data = self._get_mock_exploit_data(cve_id) return { **vuln, "exploit_available": exploit_data.get("exploit_available", False), "exploit_maturity": exploit_data.get("maturity", "unknown"), "exploit_complexity": exploit_data.get("complexity", "unknown"), "public_exploits": exploit_data.get("public_exploits", 0), "cvss_score": exploit_data.get("cvss_score", 0.0), "threat_intelligence": exploit_data.get("threat_intel", {}) } def _calculate_risk_score(self, vuln: Dict[str, Any]) -> int: """Calculate comprehensive risk score for vulnerability.""" base_score = 0 # Severity scoring severity = vuln.get("severity", "").lower() severity_scores = {"critical": 40, "high": 30, "medium": 20, "low": 10} base_score += severity_scores.get(severity, 5) # CVSS scoring cvss = vuln.get("cvss", {}) if isinstance(cvss, dict): cvss_score = cvss.get("cvss3", {}).get("score", 0) base_score += min(int(cvss_score * 6), 30) # Asset criticality (would come from asset inventory) agent_id = vuln.get("agent", {}).get("id", "") if agent_id in getattr(self.config, 'critical_agents', []): base_score += 20 # Package criticality package_name = vuln.get("name", "").lower() critical_packages = ["kernel", "openssl", "openssh", "apache", "nginx"] if any(pkg in package_name for pkg in critical_packages): base_score += 10 return min(base_score, 100) def _assess_business_impact(self, vuln: Dict[str, Any]) -> str: """Assess business impact of vulnerability.""" risk_score = self._calculate_risk_score(vuln) if risk_score >= 80: return "critical" elif risk_score >= 60: return "high" elif risk_score >= 40: return "medium" else: return "low" def _assess_remediation_complexity(self, vuln: Dict[str, Any]) -> str: """Assess complexity of remediation.""" package_name = vuln.get("name", "").lower() # Kernel updates are typically complex if "kernel" in package_name: return "high" # System libraries are medium complexity system_libs = ["glibc", "openssl", "zlib", "curl"] if any(lib in package_name for lib in system_libs): return "medium" # Application packages are typically low complexity return "low" def _calculate_priority_score(self, vuln: Dict[str, Any]) -> float: """Calculate priority score for vulnerability triage.""" score = 0.0 # Base CVSS score (0-40 points) cvss = vuln.get("cvss", {}) if isinstance(cvss, dict): cvss_score = cvss.get("cvss3", {}).get("score", 0) score += cvss_score * 4 # Exploit availability (0-30 points) if vuln.get("exploit_available", False): score += 30 # Age factor (0-20 points) - newer vulnerabilities get higher scores days_old = vuln.get("days_since_discovery", 365) if days_old <= 7: score += 20 elif days_old <= 30: score += 15 elif days_old <= 90: score += 10 # Asset criticality (0-10 points) agent_id = vuln.get("agent", {}).get("id", "") if agent_id in getattr(self.config, 'critical_agents', []): score += 10 return min(score, 100.0) def _calculate_days_since_discovery(self, vuln: Dict[str, Any]) -> int: """Calculate days since vulnerability discovery.""" published_date = vuln.get("published", "") if not published_date: return 365 # Default to old if unknown try: pub_date = datetime.fromisoformat(published_date.replace("Z", "+00:00")) return (datetime.utcnow() - pub_date.replace(tzinfo=None)).days except: return 365 def _get_mock_exploit_data(self, cve_id: str) -> Dict[str, Any]: """Get mock exploit data (replace with real threat intel APIs).""" # This would integrate with external threat intelligence APIs # For now, providing mock data based on CVE patterns mock_data = { "exploit_available": False, "maturity": "unknown", "complexity": "unknown", "public_exploits": 0, "cvss_score": 0.0, "threat_intel": {} } # Mock some patterns for demonstration if "2023" in cve_id or "2024" in cve_id: mock_data["exploit_available"] = True mock_data["maturity"] = "proof-of-concept" mock_data["public_exploits"] = 2 mock_data["cvss_score"] = 7.5 return mock_data def _generate_vulnerability_overview(self, vulns: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate vulnerability overview statistics.""" if not vulns: return {"total": 0, "message": "No vulnerabilities found"} severity_counts = Counter(v.get("severity", "unknown").lower() for v in vulns) risk_scores = [self._calculate_risk_score(v) for v in vulns] return { "total_vulnerabilities": len(vulns), "severity_distribution": dict(severity_counts), "risk_metrics": { "average_risk_score": sum(risk_scores) / len(risk_scores), "highest_risk_score": max(risk_scores), "critical_risk_count": sum(1 for score in risk_scores if score >= 80) }, "business_impact_distribution": Counter( self._assess_business_impact(v) for v in vulns ) } def _assess_infrastructure_risk(self, vulns: List[Dict[str, Any]]) -> Dict[str, Any]: """Assess overall infrastructure risk.""" if not vulns: return {"level": "low", "score": 0} risk_scores = [self._calculate_risk_score(v) for v in vulns] avg_risk = sum(risk_scores) / len(risk_scores) critical_count = sum(1 for score in risk_scores if score >= 80) # Calculate infrastructure risk level if critical_count > 10 or avg_risk > 70: level = "critical" elif critical_count > 5 or avg_risk > 50: level = "high" elif critical_count > 0 or avg_risk > 30: level = "medium" else: level = "low" return { "level": level, "score": int(avg_risk), "critical_vulnerabilities": critical_count, "recommendations": self._get_risk_recommendations(level, critical_count) } def _analyze_vulnerability_distribution(self, vulns: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze how vulnerabilities are distributed.""" agent_counts = Counter(v.get("agent", {}).get("id", "unknown") for v in vulns) package_counts = Counter(v.get("name", "unknown") for v in vulns) return { "by_agent": dict(agent_counts.most_common(10)), "by_package": dict(package_counts.most_common(10)), "most_affected_agent": agent_counts.most_common(1)[0] if agent_counts else None, "most_vulnerable_package": package_counts.most_common(1)[0] if package_counts else None } def _analyze_affected_systems(self, vulns: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze which systems are most affected.""" systems = defaultdict(list) for vuln in vulns: agent_info = vuln.get("agent", {}) agent_id = agent_info.get("id", "unknown") agent_name = agent_info.get("name", agent_id) systems[agent_id].append({ "vulnerability": vuln.get("cve", vuln.get("name", "unknown")), "severity": vuln.get("severity", "unknown"), "risk_score": self._calculate_risk_score(vuln) }) # Calculate system risk scores system_risks = {} for agent_id, agent_vulns in systems.items(): avg_risk = sum(v["risk_score"] for v in agent_vulns) / len(agent_vulns) system_risks[agent_id] = { "vulnerability_count": len(agent_vulns), "average_risk_score": avg_risk, "critical_count": sum(1 for v in agent_vulns if v["risk_score"] >= 80) } # Sort by risk sorted_systems = sorted( system_risks.items(), key=lambda x: (x[1]["critical_count"], x[1]["average_risk_score"]), reverse=True ) return { "total_affected_systems": len(systems), "highest_risk_systems": dict(sorted_systems[:10]), "systems_needing_immediate_attention": len([ s for s in system_risks.values() if s["critical_count"] > 0 ]) } def _analyze_vulnerable_packages(self, vulns: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze vulnerable packages and versions.""" package_analysis = defaultdict(lambda: { "versions": set(), "severities": set(), "agents_affected": set(), "total_vulns": 0 }) for vuln in vulns: package_name = vuln.get("name", "unknown") version = vuln.get("version", "unknown") severity = vuln.get("severity", "unknown") agent_id = vuln.get("agent", {}).get("id", "unknown") pkg_data = package_analysis[package_name] pkg_data["versions"].add(version) pkg_data["severities"].add(severity) pkg_data["agents_affected"].add(agent_id) pkg_data["total_vulns"] += 1 # Convert sets to lists for JSON serialization for pkg_data in package_analysis.values(): pkg_data["versions"] = list(pkg_data["versions"]) pkg_data["severities"] = list(pkg_data["severities"]) pkg_data["agents_affected"] = list(pkg_data["agents_affected"]) # Sort by impact sorted_packages = sorted( package_analysis.items(), key=lambda x: (x[1]["total_vulns"], len(x[1]["agents_affected"])), reverse=True ) return { "total_unique_packages": len(package_analysis), "most_problematic_packages": dict(sorted_packages[:10]), "widespread_packages": [ pkg for pkg, data in package_analysis.items() if len(data["agents_affected"]) > 1 ] } def _analyze_vulnerability_timeline(self, vulns: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze vulnerability discovery timeline.""" timeline = defaultdict(int) for vuln in vulns: published_date = vuln.get("published", "") if published_date: try: pub_date = datetime.fromisoformat(published_date.replace("Z", "+00:00")) month_key = pub_date.strftime("%Y-%m") timeline[month_key] += 1 except: timeline["unknown"] += 1 else: timeline["unknown"] += 1 # Find recent surge sorted_timeline = sorted(timeline.items()) recent_surge = False if len(sorted_timeline) >= 2: latest = sorted_timeline[-1][1] previous = sorted_timeline[-2][1] if latest > previous * 1.5: recent_surge = True return { "monthly_distribution": dict(timeline), "recent_surge_detected": recent_surge, "peak_month": max(timeline.items(), key=lambda x: x[1]) if timeline else None } def _generate_remediation_recommendations(self, vulns: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate remediation recommendations.""" recommendations = { "immediate_actions": [], "short_term_actions": [], "long_term_actions": [], "patching_strategy": {} } # Analyze for immediate actions critical_vulns = [v for v in vulns if self._calculate_risk_score(v) >= 80] if critical_vulns: recommendations["immediate_actions"].append({ "action": "Patch critical vulnerabilities immediately", "count": len(critical_vulns), "timeline": "24-48 hours" }) # Package-based recommendations package_counts = Counter(v.get("name", "") for v in vulns) for package, count in package_counts.most_common(5): if count > 1: recommendations["short_term_actions"].append({ "action": f"Update {package} package across all systems", "affected_systems": count, "timeline": "1-2 weeks" }) # Long-term recommendations recommendations["long_term_actions"] = [ "Implement automated vulnerability scanning", "Establish regular patching cycles", "Deploy vulnerability management system", "Create incident response procedures" ] return recommendations def _assess_compliance_impact(self, vulns: List[Dict[str, Any]]) -> Dict[str, Any]: """Assess compliance impact of vulnerabilities.""" high_risk_vulns = [v for v in vulns if self._calculate_risk_score(v) >= 60] compliance_risks = { "PCI_DSS": len([v for v in high_risk_vulns if self._affects_pci_compliance(v)]), "HIPAA": len([v for v in high_risk_vulns if self._affects_hipaa_compliance(v)]), "SOX": len([v for v in high_risk_vulns if self._affects_sox_compliance(v)]), "GDPR": len([v for v in high_risk_vulns if self._affects_gdpr_compliance(v)]) } return { "compliance_risks": compliance_risks, "high_risk_vulnerabilities": len(high_risk_vulns), "remediation_urgency": "high" if any(compliance_risks.values()) else "medium" } def _get_risk_recommendations(self, level: str, critical_count: int) -> List[str]: """Get risk-based recommendations.""" if level == "critical": return [ "Implement emergency patching procedures", "Consider isolating critical systems", "Activate incident response team", "Schedule immediate vulnerability assessment" ] elif level == "high": return [ "Prioritize critical vulnerability patching", "Increase monitoring frequency", "Review security controls" ] else: return [ "Continue regular patching schedule", "Monitor for new vulnerabilities" ] def _analyze_exploit_landscape(self, vulns: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze exploit landscape for critical vulnerabilities.""" with_exploits = [v for v in vulns if v.get("exploit_available", False)] exploit_maturity = Counter(v.get("exploit_maturity", "unknown") for v in with_exploits) exploit_complexity = Counter(v.get("exploit_complexity", "unknown") for v in with_exploits) return { "total_with_exploits": len(with_exploits), "exploit_maturity_distribution": dict(exploit_maturity), "exploit_complexity_distribution": dict(exploit_complexity), "weaponized_exploits": len([v for v in with_exploits if v.get("exploit_maturity") == "weaponized"]) } def _analyze_attack_vectors(self, vulns: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze potential attack vectors.""" vectors = { "network": 0, "local": 0, "physical": 0, "adjacent_network": 0 } for vuln in vulns: cvss = vuln.get("cvss", {}) if isinstance(cvss, dict): vector = cvss.get("cvss3", {}).get("attackVector", "").lower() if "network" in vector: vectors["network"] += 1 elif "local" in vector: vectors["local"] += 1 elif "physical" in vector: vectors["physical"] += 1 elif "adjacent" in vector: vectors["adjacent_network"] += 1 return vectors def _generate_immediate_actions(self, vulns: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Generate immediate action items.""" actions = [] # Top 5 highest priority vulnerabilities top_vulns = sorted(vulns, key=lambda x: x.get("priority_score", 0), reverse=True)[:5] for vuln in top_vulns: actions.append({ "vulnerability": vuln.get("cve", vuln.get("name", "unknown")), "priority_score": vuln.get("priority_score", 0), "action": f"Patch {vuln.get('name', 'package')} immediately", "affected_systems": [vuln.get("agent", {}).get("id", "unknown")], "timeline": "24 hours" if vuln.get("priority_score", 0) > 90 else "48-72 hours" }) return actions def _analyze_vulnerability_trends(self, vulns: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze vulnerability trends.""" # Group by discovery date monthly_counts = defaultdict(int) for vuln in vulns: days_old = vuln.get("days_since_discovery", 365) if days_old <= 30: monthly_counts["last_30_days"] += 1 elif days_old <= 60: monthly_counts["30_60_days"] += 1 elif days_old <= 90: monthly_counts["60_90_days"] += 1 else: monthly_counts["older"] += 1 # Determine trend recent = monthly_counts["last_30_days"] older = monthly_counts["30_60_days"] if recent > older * 1.5: trend = "increasing" elif recent < older * 0.5: trend = "decreasing" else: trend = "stable" return { "distribution": dict(monthly_counts), "trend": trend, "recent_discoveries": recent } # Mock compliance check methods def _affects_pci_compliance(self, vuln: Dict[str, Any]) -> bool: """Check if vulnerability affects PCI DSS compliance.""" # Mock implementation - would check against actual compliance requirements package_name = vuln.get("name", "").lower() return any(pkg in package_name for pkg in ["apache", "nginx", "mysql", "postgresql"]) def _affects_hipaa_compliance(self, vuln: Dict[str, Any]) -> bool: """Check if vulnerability affects HIPAA compliance.""" # Mock implementation severity = vuln.get("severity", "").lower() return severity in ["critical", "high"] def _affects_sox_compliance(self, vuln: Dict[str, Any]) -> bool: """Check if vulnerability affects SOX compliance.""" # Mock implementation return self._calculate_risk_score(vuln) >= 70 def _affects_gdpr_compliance(self, vuln: Dict[str, Any]) -> bool: """Check if vulnerability affects GDPR compliance.""" # Mock implementation return vuln.get("exploit_available", False)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gensecaihq/Wazuh-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server