Wazuh MCP Server

by unmuktoai
Verified
#!/usr/bin/env python3 """ Wazuh MCP Server for Claude Desktop Integration ------------------------------------------------- This service authenticates with the Wazuh API using JWT, retrieves alert data, transforms it into MCP-compliant JSON messages, and exposes an HTTP endpoint (/mcp) for Claude Desktop to fetch real-time security context. """ import os import requests import json import logging import datetime from flask import Flask, jsonify, request from typing import Dict, Any, Optional # Configure logging for production-grade observability. logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) logger = logging.getLogger(__name__) # Configuration via environment variables WAZUH_HOST = os.getenv("WAZUH_HOST", "localhost") WAZUH_PORT = int(os.getenv("WAZUH_PORT", "55000")) WAZUH_USER = os.getenv("WAZUH_USER", "admin") WAZUH_PASS = os.getenv("WAZUH_PASS", "admin") VERIFY_SSL = os.getenv("VERIFY_SSL", "false").lower() == "true" MCP_SERVER_PORT = int(os.getenv("MCP_SERVER_PORT", "8000")) app = Flask(__name__) class WazuhAPIClient: def __init__(self, host: str, port: int, username: str, password: str, verify_ssl: bool = True): self.host = host self.port = port self.username = username self.password = password self.verify_ssl = verify_ssl self.base_url = f"https://{host}:{port}" self.jwt_token: Optional[str] = None self.jwt_expiration: Optional[datetime.datetime] = None self.auth_endpoint = "/security/user/authenticate" def _is_jwt_valid(self) -> bool: if not self.jwt_token or not self.jwt_expiration: return False # Consider token invalid if it expires in less than 60 seconds. remaining = (self.jwt_expiration - datetime.datetime.utcnow()).total_seconds() return remaining > 60 def get_jwt(self) -> str: if self._is_jwt_valid(): return self.jwt_token auth_url = f"{self.base_url}{self.auth_endpoint}" try: logger.info("Requesting new JWT token from %s", auth_url) response = requests.post( auth_url, auth=(self.username, self.password), verify=self.verify_ssl, timeout=10 ) response.raise_for_status() data = response.json() token = data.get("jwt") if not token: raise ValueError("JWT token not found in response") self.jwt_token = token # In production, decode the JWT for an accurate expiration timestamp. self.jwt_expiration = datetime.datetime.utcnow() + datetime.timedelta(minutes=5) logger.info("Obtained new JWT token valid until %s", self.jwt_expiration.isoformat()) return self.jwt_token except Exception as e: logger.error("Error obtaining JWT token: %s", str(e)) raise def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]: jwt_token = self.get_jwt() headers = kwargs.pop("headers", {}) headers["Authorization"] = f"Bearer {jwt_token}" url = f"{self.base_url}{endpoint}" try: response = requests.request( method, url, headers=headers, verify=self.verify_ssl, timeout=10, **kwargs ) if response.status_code == 401: logger.warning("JWT expired. Re-authenticating and retrying request.") self.jwt_token = None jwt_token = self.get_jwt() headers["Authorization"] = f"Bearer {jwt_token}" response = requests.request( method, url, headers=headers, verify=self.verify_ssl, timeout=10, **kwargs ) response.raise_for_status() return response.json() except Exception as e: logger.error("Request to %s failed: %s", endpoint, str(e)) raise def get_alerts(self, index_pattern: str = "wazuh-alerts-*", query: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: endpoint = f"/{index_pattern}/_search" if query is None: query = {"query": {"match_all": {}}} try: logger.info("Retrieving alerts with index pattern '%s'", index_pattern) data = self._make_request("GET", endpoint, json=query) return data except Exception as e: logger.error("Error retrieving alerts: %s", str(e)) raise def transform_to_mcp(event: Dict[str, Any], event_type: str = "alert") -> Dict[str, Any]: """ Transform a Wazuh event into an MCP message. """ mcp_message = { "protocol_version": "1.0", "source": "Wazuh", "timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), "event_type": event_type, "context": { "id": event.get("id", "unknown"), "category": event.get("category", "intrusion_detection"), "severity": event.get("severity", "unknown"), "description": event.get("description", ""), "data": event.get("data", {}) }, "metadata": { "integration": "Wazuh-MCP", "notes": event.get("notes", "Data fetched via Wazuh API") } } return mcp_message # Instantiate Wazuh API client with environment configurations. wazuh_client = WazuhAPIClient(WAZUH_HOST, WAZUH_PORT, WAZUH_USER, WAZUH_PASS, verify_ssl=VERIFY_SSL) @app.route('/mcp', methods=['GET']) def mcp_endpoint(): """ MCP endpoint for Claude Desktop. Retrieves the latest Wazuh alerts, converts them into MCP messages, and returns as JSON. """ try: alert_query = {"query": {"match_all": {}}} alerts_data = wazuh_client.get_alerts(query=alert_query) hits = alerts_data.get("hits", {}).get("hits", []) mcp_messages = [transform_to_mcp(hit.get("_source", {}), event_type="alert") for hit in hits] return jsonify(mcp_messages), 200 except Exception as e: logger.error("Error in /mcp endpoint: %s", str(e)) return jsonify({"error": str(e)}), 500 if __name__ == '__main__': logger.info("Starting Wazuh MCP Server on port %s", MCP_SERVER_PORT) app.run(host="0.0.0.0", port=MCP_SERVER_PORT)