Skip to main content
Glama
wazuh_client.py18 kB
"""Wazuh API client optimized for Wazuh 4.8.0 to 4.14.1 compatibility with latest features.""" import asyncio import json import time from typing import Dict, Any, Optional import httpx import logging from wazuh_mcp_server.config import WazuhConfig from wazuh_mcp_server.resilience import ( CircuitBreaker, CircuitBreakerConfig, RetryConfig ) logger = logging.getLogger(__name__) class WazuhClient: """Simplified Wazuh API client with rate limiting, circuit breaker, and retry logic.""" def __init__(self, config: WazuhConfig): self.config = config self.token: Optional[str] = None self.client: Optional[httpx.AsyncClient] = None # Rate limiting self._rate_limiter = asyncio.Semaphore(config.max_connections) self._request_times = [] self._max_requests_per_minute = getattr(config, 'max_requests_per_minute', 100) self._rate_limit_enabled = True # Circuit breaker for API resilience circuit_config = CircuitBreakerConfig( failure_threshold=5, recovery_timeout=60, expected_exception=Exception ) self._circuit_breaker = CircuitBreaker(circuit_config) logger.info("WazuhClient initialized with circuit breaker and retry logic") async def initialize(self): """Initialize the HTTP client and authenticate.""" self.client = httpx.AsyncClient( verify=self.config.verify_ssl, timeout=self.config.request_timeout_seconds ) await self._authenticate() async def _authenticate(self): """Authenticate with Wazuh API.""" auth_url = f"{self.config.base_url}/security/user/authenticate" try: response = await self.client.post( auth_url, auth=(self.config.wazuh_user, self.config.wazuh_pass) ) response.raise_for_status() data = response.json() if "data" not in data or "token" not in data["data"]: raise ValueError("Invalid authentication response from Wazuh API") self.token = data["data"]["token"] print(f"✅ Authenticated with Wazuh server at {self.config.wazuh_host}") except httpx.ConnectError: raise ConnectionError(f"Cannot connect to Wazuh server at {self.config.wazuh_host}:{self.config.wazuh_port}") except httpx.TimeoutException: raise ConnectionError(f"Connection timeout to Wazuh server at {self.config.wazuh_host}") except httpx.HTTPStatusError as e: if e.response.status_code == 401: raise ValueError("Invalid Wazuh credentials. Check WAZUH_USER and WAZUH_PASS") elif e.response.status_code == 403: raise ValueError("Wazuh user does not have sufficient permissions") else: raise ValueError(f"Wazuh API error: {e.response.status_code} - {e.response.text}") async def get_alerts(self, **params) -> Dict[str, Any]: """Get alerts from Wazuh.""" return await self._request("GET", "/alerts", params=params) async def get_agents(self, **params) -> Dict[str, Any]: """Get agents from Wazuh.""" return await self._request("GET", "/agents", params=params) async def get_vulnerabilities(self, **params) -> Dict[str, Any]: """Get vulnerabilities from Wazuh Indexer (4.8.0-4.14.1 supported, uses centralized vulnerability detection).""" # Note: /vulnerability endpoint was deprecated in 4.7.0 and removed in 4.8.0 # 4.12+ includes package condition fields and CTI references # 4.14.x maintains API compatibility with enhanced vulnerability data return await self._request("GET", "/vulnerability/agents", params=params) async def get_cluster_status(self) -> Dict[str, Any]: """Get cluster status.""" return await self._request("GET", "/cluster/status") async def search_logs(self, **params) -> Dict[str, Any]: """Search logs with advanced filtering capabilities.""" return await self._request("GET", "/manager/logs", params=params) async def get_incidents(self, **params) -> Dict[str, Any]: """Get security incidents.""" return await self._request("GET", "/security/incidents", params=params) async def create_incident(self, data: Dict[str, Any]) -> Dict[str, Any]: """Create a new security incident.""" return await self._request("POST", "/security/incidents", json=data) async def update_incident(self, incident_id: str, data: Dict[str, Any]) -> Dict[str, Any]: """Update an existing security incident.""" return await self._request("PUT", f"/security/incidents/{incident_id}", json=data) async def get_rules(self, **params) -> Dict[str, Any]: """Get Wazuh detection rules.""" return await self._request("GET", "/rules", params=params) async def get_rule_info(self, rule_id: str) -> Dict[str, Any]: """Get detailed information about a specific rule.""" return await self._request("GET", f"/rules/{rule_id}") async def get_decoders(self, **params) -> Dict[str, Any]: """Get Wazuh log decoders.""" return await self._request("GET", "/decoders", params=params) async def execute_active_response(self, data: Dict[str, Any]) -> Dict[str, Any]: """Execute active response command on agents (4.8+ removed 'custom' parameter).""" # Note: 'custom' parameter was removed in Wazuh 4.8.0 # Ensure data dict doesn't contain deprecated 'custom' parameter if 'custom' in data: data = {k: v for k, v in data.items() if k != 'custom'} return await self._request("PUT", "/active-response", json=data) async def get_active_response_commands(self, **params) -> Dict[str, Any]: """Get available active response commands.""" return await self._request("GET", "/manager/configuration", params={"section": "active-response"}) async def get_cdb_lists(self, **params) -> Dict[str, Any]: """Get CDB lists.""" return await self._request("GET", "/lists", params=params) async def get_cdb_list_content(self, filename: str) -> Dict[str, Any]: """Get specific CDB list content.""" return await self._request("GET", f"/lists/{filename}") async def get_fim_events(self, **params) -> Dict[str, Any]: """Get File Integrity Monitoring events.""" return await self._request("GET", "/syscheck", params=params) async def get_syscollector_info(self, agent_id: str, **params) -> Dict[str, Any]: """Get system inventory information from agent.""" return await self._request("GET", f"/syscollector/{agent_id}", params=params) async def get_manager_stats(self, **params) -> Dict[str, Any]: """Get manager statistics.""" return await self._request("GET", "/manager/stats", params=params) async def get_manager_version_check(self) -> Dict[str, Any]: """Check for new Wazuh releases (4.8+ feature).""" return await self._request("GET", "/manager/version/check") async def get_cti_data(self, cve_id: str) -> Dict[str, Any]: """Get Cyber Threat Intelligence data for CVE (4.12-4.14.1 feature).""" return await self._request("GET", f"/vulnerability/cti/{cve_id}") async def get_vulnerability_details(self, vuln_id: str, **params) -> Dict[str, Any]: """Get detailed vulnerability information including CTI references (4.12-4.14.1 enhanced).""" return await self._request("GET", f"/vulnerability/{vuln_id}", params=params) async def get_agent_stats(self, agent_id: str, component: str = "logcollector") -> Dict[str, Any]: """Get agent component statistics.""" return await self._request("GET", f"/agents/{agent_id}/stats/{component}") async def _rate_limit_check(self): """Check and enforce rate limiting.""" current_time = time.time() # Remove requests older than 1 minute self._request_times = [t for t in self._request_times if current_time - t < 60] # Check if we're hitting the rate limit if len(self._request_times) >= self._max_requests_per_minute: # Calculate how long to wait before the oldest request expires oldest_request_time = self._request_times[0] sleep_time = 60 - (current_time - oldest_request_time) if sleep_time > 0: print(f"⚠️ Rate limit reached ({self._max_requests_per_minute}/min). Waiting {sleep_time:.1f}s...") await asyncio.sleep(sleep_time) # Clean up expired requests after waiting current_time = time.time() self._request_times = [t for t in self._request_times if current_time - t < 60] # Record this request time self._request_times.append(current_time) async def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]: """Make authenticated request to Wazuh API with rate limiting, circuit breaker, and retry logic.""" # Apply rate limiting async with self._rate_limiter: await self._rate_limit_check() # Apply circuit breaker and retry logic return await self._request_with_resilience(method, endpoint, **kwargs) @RetryConfig.WAZUH_API_RETRY async def _request_with_resilience(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]: """Execute request with circuit breaker and retry logic.""" return await self._circuit_breaker._call(self._execute_request, method, endpoint, **kwargs) async def _execute_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]: """Execute the actual HTTP request to Wazuh API.""" if not self.token: await self._authenticate() url = f"{self.config.base_url}{endpoint}" headers = {"Authorization": f"Bearer {self.token}"} try: response = await self.client.request(method, url, headers=headers, **kwargs) response.raise_for_status() data = response.json() # Validate response structure if "data" not in data: raise ValueError(f"Invalid response structure from Wazuh API: {endpoint}") return data except httpx.HTTPStatusError as e: if e.response.status_code == 401: # Token might be expired, try to re-authenticate self.token = None await self._authenticate() # Retry the request once headers = {"Authorization": f"Bearer {self.token}"} response = await self.client.request(method, url, headers=headers, **kwargs) response.raise_for_status() return response.json() else: logger.error(f"Wazuh API request failed: {e.response.status_code} - {e.response.text}") raise ValueError(f"Wazuh API request failed: {e.response.status_code} - {e.response.text}") except httpx.ConnectError as e: logger.error(f"Lost connection to Wazuh server at {self.config.wazuh_host}") raise ConnectionError(f"Lost connection to Wazuh server at {self.config.wazuh_host}") except httpx.TimeoutException as e: logger.error(f"Request timeout to Wazuh server") raise ConnectionError(f"Request timeout to Wazuh server") async def get_manager_info(self) -> Dict[str, Any]: """Get Wazuh manager information.""" return await self._request("GET", "/") async def get_alert_summary(self, time_range: str, group_by: str) -> Dict[str, Any]: """Get alert summary grouped by field.""" params = {"time_range": time_range, "group_by": group_by} return await self._request("GET", "/alerts/summary", params=params) async def analyze_alert_patterns(self, time_range: str, min_frequency: int) -> Dict[str, Any]: """Analyze alert patterns.""" params = {"time_range": time_range, "min_frequency": min_frequency} return await self._request("GET", "/alerts/patterns", params=params) async def search_security_events(self, query: str, time_range: str, limit: int) -> Dict[str, Any]: """Search security events.""" params = {"q": query, "time_range": time_range, "limit": limit} return await self._request("GET", "/security/events", params=params) async def get_running_agents(self) -> Dict[str, Any]: """Get running agents.""" return await self._request("GET", "/agents", params={"status": "active"}) async def check_agent_health(self, agent_id: str) -> Dict[str, Any]: """Check agent health.""" return await self._request("GET", f"/agents/{agent_id}/health") async def get_agent_processes(self, agent_id: str, limit: int) -> Dict[str, Any]: """Get agent processes.""" return await self._request("GET", f"/syscollector/{agent_id}/processes", params={"limit": limit}) async def get_agent_ports(self, agent_id: str, limit: int) -> Dict[str, Any]: """Get agent ports.""" return await self._request("GET", f"/syscollector/{agent_id}/ports", params={"limit": limit}) async def get_agent_configuration(self, agent_id: str) -> Dict[str, Any]: """Get agent configuration.""" return await self._request("GET", f"/agents/{agent_id}/config") async def get_critical_vulnerabilities(self, limit: int) -> Dict[str, Any]: """Get critical vulnerabilities.""" return await self._request("GET", "/vulnerability/agents", params={"severity": "critical", "limit": limit}) async def get_vulnerability_summary(self, time_range: str) -> Dict[str, Any]: """Get vulnerability summary.""" return await self._request("GET", "/vulnerability/summary", params={"time_range": time_range}) async def analyze_security_threat(self, indicator: str, indicator_type: str) -> Dict[str, Any]: """Analyze security threat.""" data = {"indicator": indicator, "type": indicator_type} return await self._request("POST", "/security/threat/analyze", json=data) async def check_ioc_reputation(self, indicator: str, indicator_type: str) -> Dict[str, Any]: """Check IoC reputation.""" params = {"indicator": indicator, "type": indicator_type} return await self._request("GET", "/security/ioc/reputation", params=params) async def perform_risk_assessment(self, agent_id: str = None) -> Dict[str, Any]: """Perform risk assessment.""" endpoint = f"/security/risk/{agent_id}" if agent_id else "/security/risk" return await self._request("GET", endpoint) async def get_top_security_threats(self, limit: int, time_range: str) -> Dict[str, Any]: """Get top security threats.""" params = {"limit": limit, "time_range": time_range} return await self._request("GET", "/security/threats/top", params=params) async def generate_security_report(self, report_type: str, include_recommendations: bool) -> Dict[str, Any]: """Generate security report.""" data = {"type": report_type, "include_recommendations": include_recommendations} return await self._request("POST", "/security/reports/generate", json=data) async def run_compliance_check(self, framework: str, agent_id: str = None) -> Dict[str, Any]: """Run compliance check.""" data = {"framework": framework} if agent_id: data["agent_id"] = agent_id return await self._request("POST", "/security/compliance/check", json=data) async def get_wazuh_statistics(self) -> Dict[str, Any]: """Get Wazuh statistics.""" return await self._request("GET", "/manager/stats/all") async def get_weekly_stats(self) -> Dict[str, Any]: """Get weekly statistics.""" return await self._request("GET", "/manager/stats/weekly") async def get_cluster_health(self) -> Dict[str, Any]: """Get cluster health.""" return await self._request("GET", "/cluster/health") async def get_cluster_nodes(self) -> Dict[str, Any]: """Get cluster nodes.""" return await self._request("GET", "/cluster/nodes") async def get_rules_summary(self) -> Dict[str, Any]: """Get rules summary.""" return await self._request("GET", "/rules/summary") async def get_remoted_stats(self) -> Dict[str, Any]: """Get remoted statistics.""" return await self._request("GET", "/manager/stats/remoted") async def get_log_collector_stats(self) -> Dict[str, Any]: """Get log collector statistics.""" return await self._request("GET", "/manager/stats/logcollector") async def search_manager_logs(self, query: str, limit: int) -> Dict[str, Any]: """Search manager logs.""" params = {"q": query, "limit": limit} return await self._request("GET", "/manager/logs", params=params) async def get_manager_error_logs(self, limit: int) -> Dict[str, Any]: """Get manager error logs.""" params = {"level": "error", "limit": limit} return await self._request("GET", "/manager/logs", params=params) async def validate_connection(self) -> Dict[str, Any]: """Validate Wazuh connection.""" try: result = await self._request("GET", "/") return {"status": "connected", "details": result} except Exception as e: return {"status": "failed", "error": str(e)} async def close(self): """Close the HTTP client.""" if self.client: await self.client.aclose()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gensecaihq/Wazuh-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server