Skip to main content
Glama
security.py8.02 kB
"""Security and SIEM FastMCP tools for Ludus MCP server.""" from typing import Any from fastmcp import FastMCP from ludus_mcp.core.client import LudusAPIClient from ludus_mcp.server.handlers.siem import SIEMHandler from ludus_mcp.server.handlers.wazuh import WazuhHandler from ludus_mcp.server.handlers.access import AccessHandler from ludus_mcp.server.handlers.security_compliance import SecurityComplianceHandler from ludus_mcp.server.tools.utils import LazyHandlerRegistry, format_tool_response def create_security_tools(client: LudusAPIClient) -> FastMCP: """Create security and SIEM tools. Args: client: Ludus API client Returns: FastMCP instance with security tools registered """ mcp = FastMCP("Security & SIEM") registry = LazyHandlerRegistry(client) # ==================== SIEM TOOLS ==================== @mcp.tool() async def get_siem_info(user_id: str | None = None) -> dict: """Get SIEM information for the range. Args: user_id: Optional user ID (admin only) Returns: SIEM configuration and status """ handler = registry.get_handler("siem", SIEMHandler) result = await handler.get_siem_info(user_id) return format_tool_response(result) @mcp.tool() async def get_siem_alerts( user_id: str | None = None, severity: str | None = None, limit: int = 100 ) -> list[dict]: """Get SIEM alerts for the range. Args: user_id: Optional user ID (admin only) severity: Filter by severity level (low, medium, high, critical) limit: Maximum number of alerts to return Returns: List of SIEM alerts """ handler = registry.get_handler("siem", SIEMHandler) result = await handler.get_siem_alerts(user_id, severity, limit) return format_tool_response(result) @mcp.tool() async def get_siem_agents(user_id: str | None = None) -> list[dict]: """Get SIEM agents for the range. Args: user_id: Optional user ID (admin only) Returns: List of SIEM agents """ handler = registry.get_handler("siem", SIEMHandler) result = await handler.get_siem_agents(user_id) return format_tool_response(result) @mcp.tool() async def get_detection_summary(user_id: str | None = None) -> dict: """Get detection summary from SIEM. Args: user_id: Optional user ID (admin only) Returns: Detection summary with statistics """ handler = registry.get_handler("siem", SIEMHandler) result = await handler.get_detection_summary(user_id) return format_tool_response(result) # ==================== WAZUH TOOLS ==================== @mcp.tool() async def get_wazuh_info(user_id: str | None = None) -> dict: """Get Wazuh SIEM information for the range. Args: user_id: Optional user ID (admin only) Returns: Wazuh configuration and status """ handler = registry.get_handler("wazuh", WazuhHandler) result = await handler.get_wazuh_info(user_id) return format_tool_response(result) # ==================== ACCESS CONTROL TOOLS ==================== @mcp.tool() async def get_range_access(user_id: str | None = None) -> dict: """Get range access configuration. Args: user_id: Optional user ID (admin only) Returns: Range access configuration """ handler = registry.get_handler("access", AccessHandler) result = await handler.get_range_access(user_id) return format_tool_response(result) @mcp.tool() async def grant_range_access( target_user_id: str, permissions: list[str], user_id: str | None = None ) -> dict: """Grant access to range for another user. Args: target_user_id: User ID to grant access to permissions: List of permissions to grant (read, write, admin) user_id: Optional user ID (admin only) Returns: Grant result """ handler = registry.get_handler("access", AccessHandler) result = await handler.grant_range_access(target_user_id, permissions, user_id) return format_tool_response(result) @mcp.tool() async def revoke_range_access( target_user_id: str, user_id: str | None = None ) -> dict: """Revoke range access from a user. Args: target_user_id: User ID to revoke access from user_id: Optional user ID (admin only) Returns: Revoke result """ handler = registry.get_handler("access", AccessHandler) result = await handler.revoke_range_access(target_user_id, user_id) return format_tool_response(result) @mcp.tool() async def clear_range_access(user_id: str | None = None) -> dict: """Clear all range access permissions. Args: user_id: Optional user ID (admin only) Returns: Clear result """ handler = registry.get_handler("access", AccessHandler) result = await handler.clear_range_access(user_id) return format_tool_response(result) @mcp.tool() async def range_access_logs( user_id: str | None = None, limit: int = 100 ) -> list[dict]: """Get range access logs. Args: user_id: Optional user ID (admin only) limit: Maximum number of log entries to return Returns: List of access log entries """ handler = registry.get_handler("access", AccessHandler) result = await handler.range_access_logs(user_id, limit) return format_tool_response(result) # ==================== SECURITY COMPLIANCE TOOLS ==================== @mcp.tool() async def security_audit(user_id: str | None = None) -> dict: """Run security audit on the range. Args: user_id: Optional user ID (admin only) Returns: Security audit report """ handler = registry.get_handler("security_compliance", SecurityComplianceHandler) result = await handler.security_audit(user_id) return format_tool_response(result) @mcp.tool() async def compliance_check( framework: str = "nist", user_id: str | None = None ) -> dict: """Check compliance against security framework. Args: framework: Security framework to check against (nist, pci, iso27001) user_id: Optional user ID (admin only) Returns: Compliance check results """ handler = registry.get_handler("security_compliance", SecurityComplianceHandler) result = await handler.compliance_check(framework, user_id) return format_tool_response(result) @mcp.tool() async def rotate_credentials(user_id: str | None = None) -> dict: """Rotate credentials for the range. Args: user_id: Optional user ID (admin only) Returns: Credential rotation result """ handler = registry.get_handler("security_compliance", SecurityComplianceHandler) result = await handler.rotate_credentials(user_id) return format_tool_response(result) @mcp.tool() async def get_vulnerability_scan(user_id: str | None = None) -> dict: """Get vulnerability scan results for the range. Args: user_id: Optional user ID (admin only) Returns: Vulnerability scan results """ handler = registry.get_handler("security_compliance", SecurityComplianceHandler) result = await handler.get_vulnerability_scan(user_id) return format_tool_response(result) return mcp

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server