We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Handler for security and compliance operations."""
from datetime import datetime
from typing import Any
import hashlib
import secrets
from ludus_mcp.core.client import LudusAPIClient
from ludus_mcp.utils.logging import get_logger
logger = get_logger(__name__)
class SecurityComplianceHandler:
"""Handler for security and compliance."""
def __init__(self, client: LudusAPIClient) -> None:
"""Initialize the security compliance handler."""
self.client = client
async def security_audit(self, user_id: str | None = None) -> dict[str, Any]:
"""Audit range for security best practices."""
try:
range_info = await self.client.get_range(user_id)
range_config = await self.client.get_range_config(user_id)
findings = []
recommendations = []
# Check for default passwords
config_str = str(range_config).lower()
if any(pwd in config_str for pwd in ["password", "123456", "admin"]):
findings.append({
"severity": "high",
"category": "credentials",
"finding": "Potential default or weak passwords detected in configuration"
})
recommendations.append("Use strong, unique passwords for all VMs")
# Check for unencrypted communications
if "ssl: false" in config_str or "tls: false" in config_str:
findings.append({
"severity": "medium",
"category": "encryption",
"finding": "Unencrypted communication channels detected"
})
recommendations.append("Enable SSL/TLS for all services")
# Check testing state
if range_info.get("testingEnabled"):
findings.append({
"severity": "info",
"category": "exposure",
"finding": "Range has testing enabled (external access possible)"
})
recommendations.append("Disable testing when not actively conducting exercises")
security_score = 100 - (len([f for f in findings if f["severity"] == "high"]) * 20) - (len([f for f in findings if f["severity"] == "medium"]) * 10)
return {
"status": "success",
"timestamp": datetime.now().isoformat(),
"security_score": max(0, security_score),
"grade": self._score_to_grade(security_score),
"findings": findings,
"recommendations": recommendations,
"summary": {
"high_severity": sum(1 for f in findings if f["severity"] == "high"),
"medium_severity": sum(1 for f in findings if f["severity"] == "medium"),
"low_severity": sum(1 for f in findings if f["severity"] == "low"),
"info": sum(1 for f in findings if f["severity"] == "info")
}
}
except Exception as e:
logger.error(f"Error performing security audit: {e}")
return {"status": "error", "error": str(e)}
async def compliance_check(
self,
framework: str = "general",
user_id: str | None = None
) -> dict[str, Any]:
"""Check compliance with organizational policies."""
try:
range_info = await self.client.get_range(user_id)
compliance_results = {
"framework": framework,
"checks": [],
"compliant": True
}
# Resource limits compliance
vms = range_info.get("VMs", [])
total_memory = sum(vm.get("memory", 0) for vm in vms) / 1024
if total_memory > 64:
compliance_results["checks"].append({
"control": "RESOURCE-001",
"description": "Memory usage under 64GB",
"status": "non_compliant",
"current_value": f"{total_memory:.2f} GB"
})
compliance_results["compliant"] = False
else:
compliance_results["checks"].append({
"control": "RESOURCE-001",
"description": "Memory usage under 64GB",
"status": "compliant",
"current_value": f"{total_memory:.2f} GB"
})
# Snapshot policy compliance
try:
snapshots = await self.client.list_snapshots(user_id)
has_snapshots = len(snapshots) > 0 if snapshots else False
compliance_results["checks"].append({
"control": "BACKUP-001",
"description": "Regular snapshots exist",
"status": "compliant" if has_snapshots else "non_compliant"
})
if not has_snapshots:
compliance_results["compliant"] = False
except Exception:
pass
return {
"status": "success",
"timestamp": datetime.now().isoformat(),
"compliance_results": compliance_results,
"overall_compliant": compliance_results["compliant"]
}
except Exception as e:
logger.error(f"Error checking compliance: {e}")
return {"status": "error", "error": str(e)}
async def rotate_credentials(
self,
vm_names: list[str] | None = None,
user_id: str | None = None
) -> dict[str, Any]:
"""Bulk credential rotation for VMs."""
try:
range_info = await self.client.get_range(user_id)
vms = range_info.get("VMs", [])
if vm_names:
vms = [vm for vm in vms if vm.get("name") in vm_names]
rotations = []
for vm in vms:
new_password = self._generate_strong_password()
rotations.append({
"vm_name": vm.get("name"),
"new_password": new_password,
"rotation_timestamp": datetime.now().isoformat(),
"note": "Apply using Ansible playbook: ansible.builtin.user module"
})
return {
"status": "success",
"timestamp": datetime.now().isoformat(),
"rotations": rotations,
"vm_count": len(rotations),
"implementation_guide": {
"ansible_playbook": "Use ansible.builtin.user with update_password: always",
"windows": "Use win_user module for Windows VMs"
}
}
except Exception as e:
logger.error(f"Error rotating credentials: {e}")
return {"status": "error", "error": str(e)}
async def get_vulnerability_scan(self, user_id: str | None = None) -> dict[str, Any]:
"""Integration point for vulnerability scanners."""
try:
range_info = await self.client.get_range(user_id)
inventory = await self.client.get_range_ansible_inventory(user_id)
vms = range_info.get("VMs", [])
scan_targets = []
for vm in vms:
scan_targets.append({
"hostname": vm.get("name"),
"ip": vm.get("ip", "unknown"),
"os": vm.get("template", "unknown"),
"status": vm.get("status")
})
return {
"status": "success",
"timestamp": datetime.now().isoformat(),
"scan_targets": scan_targets,
"target_count": len(scan_targets),
"integration_guide": {
"nessus": "Import targets into Nessus scanner",
"openvas": "Use targets for OpenVAS scan",
"ansible": "Use ansible community.general.nmap module"
},
"note": "This provides target information. Actual scanning requires external tools."
}
except Exception as e:
logger.error(f"Error getting vulnerability scan info: {e}")
return {"status": "error", "error": str(e)}
def _generate_strong_password(self, length: int = 16) -> str:
"""Generate a strong random password."""
alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*"
return ''.join(secrets.choice(alphabet) for _ in range(length))
def _score_to_grade(self, score: float) -> str:
"""Convert score to letter grade."""
if score >= 90: return "A"
elif score >= 80: return "B"
elif score >= 70: return "C"
elif score >= 60: return "D"
else: return "F"