Skip to main content
Glama
security_compliance.py8.9 kB
"""Handler for security and compliance operations.""" from datetime import datetime from typing import Any import hashlib import secrets from ludus_mcp.core.client import LudusAPIClient from ludus_mcp.utils.logging import get_logger logger = get_logger(__name__) class SecurityComplianceHandler: """Handler for security and compliance.""" def __init__(self, client: LudusAPIClient) -> None: """Initialize the security compliance handler.""" self.client = client async def security_audit(self, user_id: str | None = None) -> dict[str, Any]: """Audit range for security best practices.""" try: range_info = await self.client.get_range(user_id) range_config = await self.client.get_range_config(user_id) findings = [] recommendations = [] # Check for default passwords config_str = str(range_config).lower() if any(pwd in config_str for pwd in ["password", "123456", "admin"]): findings.append({ "severity": "high", "category": "credentials", "finding": "Potential default or weak passwords detected in configuration" }) recommendations.append("Use strong, unique passwords for all VMs") # Check for unencrypted communications if "ssl: false" in config_str or "tls: false" in config_str: findings.append({ "severity": "medium", "category": "encryption", "finding": "Unencrypted communication channels detected" }) recommendations.append("Enable SSL/TLS for all services") # Check testing state if range_info.get("testingEnabled"): findings.append({ "severity": "info", "category": "exposure", "finding": "Range has testing enabled (external access possible)" }) recommendations.append("Disable testing when not actively conducting exercises") security_score = 100 - (len([f for f in findings if f["severity"] == "high"]) * 20) - (len([f for f in findings if f["severity"] == "medium"]) * 10) return { "status": "success", "timestamp": datetime.now().isoformat(), "security_score": max(0, security_score), "grade": self._score_to_grade(security_score), "findings": findings, "recommendations": recommendations, "summary": { "high_severity": sum(1 for f in findings if f["severity"] == "high"), "medium_severity": sum(1 for f in findings if f["severity"] == "medium"), "low_severity": sum(1 for f in findings if f["severity"] == "low"), "info": sum(1 for f in findings if f["severity"] == "info") } } except Exception as e: logger.error(f"Error performing security audit: {e}") return {"status": "error", "error": str(e)} async def compliance_check( self, framework: str = "general", user_id: str | None = None ) -> dict[str, Any]: """Check compliance with organizational policies.""" try: range_info = await self.client.get_range(user_id) compliance_results = { "framework": framework, "checks": [], "compliant": True } # Resource limits compliance vms = range_info.get("VMs", []) total_memory = sum(vm.get("memory", 0) for vm in vms) / 1024 if total_memory > 64: compliance_results["checks"].append({ "control": "RESOURCE-001", "description": "Memory usage under 64GB", "status": "non_compliant", "current_value": f"{total_memory:.2f} GB" }) compliance_results["compliant"] = False else: compliance_results["checks"].append({ "control": "RESOURCE-001", "description": "Memory usage under 64GB", "status": "compliant", "current_value": f"{total_memory:.2f} GB" }) # Snapshot policy compliance try: snapshots = await self.client.list_snapshots(user_id) has_snapshots = len(snapshots) > 0 if snapshots else False compliance_results["checks"].append({ "control": "BACKUP-001", "description": "Regular snapshots exist", "status": "compliant" if has_snapshots else "non_compliant" }) if not has_snapshots: compliance_results["compliant"] = False except Exception: pass return { "status": "success", "timestamp": datetime.now().isoformat(), "compliance_results": compliance_results, "overall_compliant": compliance_results["compliant"] } except Exception as e: logger.error(f"Error checking compliance: {e}") return {"status": "error", "error": str(e)} async def rotate_credentials( self, vm_names: list[str] | None = None, user_id: str | None = None ) -> dict[str, Any]: """Bulk credential rotation for VMs.""" try: range_info = await self.client.get_range(user_id) vms = range_info.get("VMs", []) if vm_names: vms = [vm for vm in vms if vm.get("name") in vm_names] rotations = [] for vm in vms: new_password = self._generate_strong_password() rotations.append({ "vm_name": vm.get("name"), "new_password": new_password, "rotation_timestamp": datetime.now().isoformat(), "note": "Apply using Ansible playbook: ansible.builtin.user module" }) return { "status": "success", "timestamp": datetime.now().isoformat(), "rotations": rotations, "vm_count": len(rotations), "implementation_guide": { "ansible_playbook": "Use ansible.builtin.user with update_password: always", "windows": "Use win_user module for Windows VMs" } } except Exception as e: logger.error(f"Error rotating credentials: {e}") return {"status": "error", "error": str(e)} async def get_vulnerability_scan(self, user_id: str | None = None) -> dict[str, Any]: """Integration point for vulnerability scanners.""" try: range_info = await self.client.get_range(user_id) inventory = await self.client.get_range_ansible_inventory(user_id) vms = range_info.get("VMs", []) scan_targets = [] for vm in vms: scan_targets.append({ "hostname": vm.get("name"), "ip": vm.get("ip", "unknown"), "os": vm.get("template", "unknown"), "status": vm.get("status") }) return { "status": "success", "timestamp": datetime.now().isoformat(), "scan_targets": scan_targets, "target_count": len(scan_targets), "integration_guide": { "nessus": "Import targets into Nessus scanner", "openvas": "Use targets for OpenVAS scan", "ansible": "Use ansible community.general.nmap module" }, "note": "This provides target information. Actual scanning requires external tools." } except Exception as e: logger.error(f"Error getting vulnerability scan info: {e}") return {"status": "error", "error": str(e)} def _generate_strong_password(self, length: int = 16) -> str: """Generate a strong random password.""" alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*" return ''.join(secrets.choice(alphabet) for _ in range(length)) def _score_to_grade(self, score: float) -> str: """Convert score to letter grade.""" if score >= 90: return "A" elif score >= 80: return "B" elif score >= 70: return "C" elif score >= 60: return "D" else: return "F"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server