wazuh-mcp-server•3.41 kB
#!/usr/bin/env python3
"""
Wazuh MCP Server v2.1.2 - FastMCP STDIO Executable
==================================================
Production-ready FastMCP server for Wazuh SIEM integration.
Pure STDIO transport for Claude Desktop - no Docker required.
"""
import sys
import os
import asyncio
import argparse
import signal
import logging
from pathlib import Path
# Add the src directory to the Python path
src_path = Path(__file__).parent.parent / "src"
sys.path.insert(0, str(src_path))
def setup_signal_handlers():
"""Setup graceful shutdown signal handlers."""
def signal_handler(signum, frame):
print(f"\n🛑 Received signal {signum} - initiating graceful shutdown...", file=sys.stderr)
# FastMCP will handle the actual shutdown
sys.exit(0)
# Handle common termination signals
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Handle additional signals on Unix systems
if hasattr(signal, 'SIGHUP'):
signal.signal(signal.SIGHUP, signal_handler)
def main():
parser = argparse.ArgumentParser(
description="Wazuh MCP Server - FastMCP STDIO Implementation"
)
parser.add_argument(
"--stdio",
action="store_true",
default=True,
help="Run in STDIO mode (default and only supported mode)"
)
parser.add_argument(
"--version",
action="version",
version="Wazuh MCP Server 2.1.2 (FastMCP STDIO Only)"
)
parser.add_argument(
"--health-check",
action="store_true",
help="Run health checks and exit"
)
args = parser.parse_args()
# Setup signal handlers for graceful shutdown
setup_signal_handlers()
# Import and run the FastMCP server
try:
from wazuh_mcp_server.server import mcp, initialize_server
print("🚀 Starting Wazuh MCP Server v2.1.2 with FastMCP (STDIO mode)...", file=sys.stderr)
# Initialize server components first with comprehensive health checks
asyncio.run(initialize_server())
print("✅ Server initialization completed - ready for connections", file=sys.stderr)
# If health check only, exit after successful initialization
if args.health_check:
print("✅ Health check completed successfully", file=sys.stderr)
sys.exit(0)
# Run FastMCP server in STDIO mode (synchronous)
print("📡 Starting FastMCP STDIO transport...", file=sys.stderr)
mcp.run(transport="stdio")
except ImportError as e:
print(f"❌ Error importing server components: {e}", file=sys.stderr)
print("💡 Make sure all dependencies are installed: pip install -r requirements.txt", file=sys.stderr)
sys.exit(1)
except RuntimeError as e:
print(f"❌ Server startup failed: {e}", file=sys.stderr)
print("💡 Check the health check failure report for details", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
print("\n🛑 Server shutdown by user", file=sys.stderr)
sys.exit(0)
except Exception as e:
print(f"❌ Unexpected error starting server: {e}", file=sys.stderr)
print("💡 Enable debug logging for more details", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()