Skip to main content
Glama
role_manager.py37 kB
"""Manager for ensuring Ludus roles are installed.""" import asyncio import os import subprocess import tempfile from pathlib import Path from typing import Any from ludus_mcp.core.client import LudusAPIClient from ludus_mcp.utils.logging import get_logger logger = get_logger(__name__) class RoleManager: """Manages Ludus Ansible role installation.""" # Roles that need to be cloned from GitHub before installation # These roles must be installed from local directories # See: https://docs.ludus.cloud/docs/roles/ # Format: "role_name": ("repository_url", "subdirectory_path" or None) ROLE_REPOSITORIES = { # AD Roles - These must be cloned on Ludus server and installed from directory # Expected location: /opt/ludus/roles/<role-name>/ "ludus-ad-content": ("https://github.com/Cyblex-Consulting/ludus-ad-content", None), "ludus-ad-vulns": ("https://github.com/Primusinterp/ludus-ad-vulns", None), "ludus_child_domain": ("https://github.com/ChoiSG/ludus_ansible_roles", None), "ludus_badblood": ("https://github.com/curi0usJack/ludus_badblood", None), "ludus-local-users": ("https://github.com/Cyblex-Consulting/ludus-local-users", None), # Security & Monitoring Roles "ludus_graylog_server": ("https://github.com/frack113/my-ludus-roles", None), "ludus_enable_asr": ("https://github.com/curi0usJack/Ludus-MDE-MDI-Roles", "ludus_enable_asr"), "ludus_enable_mdi_gpo": ("https://github.com/curi0usJack/Ludus-MDE-MDI-Roles", "ludus_enable_mdi_gpo"), # Web Application Roles "ludus_juiceshop": ("https://github.com/xurger/ludus_juiceshop", None), # Additional roles from Ludus documentation that require directory installation # See: https://docs.ludus.cloud/docs/roles/ "ludus_aurora_agent": ("https://github.com/frack113/ludus_aurora_agent", None), "ludus_filigran_opencti": ("https://github.com/frack113/ludus_filigran_opencti", None), "ludus_ghosts_server": ("https://github.com/frack113/ludus_ghosts_server", None), "ludus_adtimeline_syncthing": ("https://github.com/mojeda101/ludus_adtimeline_syncthing", None), "ludus_litterbox": ("https://github.com/professor-moody/ludus_litterbox_role", None), "ludus_nemesis": ("https://github.com/brmkit/ludus_nemesis", None), "ludus_guacamole": ("https://github.com/brmkit/ludus_guacamole", None), } # Default server-side path for cloned roles # Roles should be cloned to /opt/ludus/roles/ on the Ludus server SERVER_ROLES_PATH = "/opt/ludus/roles" # Common roles needed for scenarios # Note: Templates already have Windows/Linux systems built # These roles are for services that need to be installed on top # See: https://docs.ludus.cloud/docs/roles/ REQUIRED_ROLES = { "ad": [ # AD roles that need to be cloned and installed from local directories "ludus-ad-content", # Creates AD structure (OUs, groups, users) "ludus-ad-vulns", # Adds vulnerabilities to AD ], "ad-intermediate": [ # Additional roles for intermediate/advanced AD scenarios "ludus-ad-content", "ludus-ad-vulns", "badsectorlabs.ludus_adcs", # AD Certificate Services ], "ad-advanced": [ # Additional roles for advanced AD scenarios "ludus-ad-content", "ludus-ad-vulns", "badsectorlabs.ludus_adcs", # AD Certificate Services "ludus_child_domain", # Child domain creation ], "sql": [ # SQL Server roles "badsectorlabs.ludus_mssql", # SQL Server installation ], "template": [ # Roles for building custom templates "badsectorlabs.ludus_commandovm", # Sets up Commando VM on Windows >= 10 "badsectorlabs.ludus_flarevm", # Installs Flare VM on Windows >= 10 "badsectorlabs.ludus_remnux", # Installs REMnux on Ubuntu 20.04 ], "web": [ # Web service roles added in scenario builders if needed ], "network": [ # Network roles added in scenario builders if needed ], "purple-team": [ # Purple team specific roles added in scenario builders ], "wazuh": [ "aleemladha.wazuh_server_install", # Wazuh server role from Ludus docs "aleemladha.ludus_wazuh_agent", # Wazuh agent role from Ludus docs ], "splunk": [ # Splunk roles added in scenario builders if needed ], "elastic": [ "badsectorlabs.ludus_elastic_container", # Elastic container from Ludus docs "badsectorlabs.ludus_elastic_agent", # Elastic agent from Ludus docs ], "security-onion": [ # Security Onion roles added in scenario builders if needed ], "goad": [ # GOAD scenarios use templates, no additional roles needed ], } def __init__( self, client: LudusAPIClient, roles_cache_dir: str | None = None, ssh_host: str | None = None, ssh_user: str | None = None, ssh_key_path: str | None = None, ssh_password: str | None = None, allow_ssh_install: bool = False, ): """Initialize the role manager. Args: client: Ludus API client roles_cache_dir: Directory to cache cloned role repositories (default: ~/.ludus/roles) ssh_host: SSH hostname/IP for Ludus server (optional, for automatic role installation) ssh_user: SSH username for Ludus server (optional) ssh_key_path: Path to SSH private key (optional, preferred over password) ssh_password: SSH password (optional, less secure than key) allow_ssh_install: Allow automatic installation via SSH (default: False, requires explicit permission) """ self.client = client if roles_cache_dir: self.roles_cache_dir = Path(roles_cache_dir) else: # Default to ~/.ludus/roles self.roles_cache_dir = Path.home() / ".ludus" / "roles" self.roles_cache_dir.mkdir(parents=True, exist_ok=True) # SSH configuration for automatic role installation on Ludus server self.ssh_host = ssh_host or os.getenv("LUDUS_SSH_HOST") self.ssh_user = ssh_user or os.getenv("LUDUS_SSH_USER", "root") self.ssh_key_path = ssh_key_path or os.getenv("LUDUS_SSH_KEY_PATH") self.ssh_password = ssh_password or os.getenv("LUDUS_SSH_PASSWORD") self.allow_ssh_install = allow_ssh_install or os.getenv("LUDUS_ALLOW_SSH_INSTALL", "false").lower() == "true" # Validate SSH configuration if enabled if self.allow_ssh_install: if not self.ssh_host: logger.warning( "SSH installation enabled but LUDUS_SSH_HOST not set. " "Automatic role installation via SSH will be disabled." ) self.allow_ssh_install = False elif not self.ssh_key_path and not self.ssh_password: logger.warning( "SSH installation enabled but neither LUDUS_SSH_KEY_PATH nor LUDUS_SSH_PASSWORD set. " "Automatic role installation via SSH will be disabled." ) self.allow_ssh_install = False async def get_installed_roles(self) -> list[dict]: """Get list of all installed roles. Prefers using the ludus CLI with --url for reliable remote checking. Falls back to HTTP API if CLI is not available. Returns: List of role dictionaries with name, version, and global status """ import shutil import subprocess from ludus_mcp.utils.config import get_settings settings = get_settings() # Prefer ludus CLI if available (works both locally and remotely with --url) ludus_cli = shutil.which("ludus") if ludus_cli and settings.ludus_api_key: try: env = os.environ.copy() env["LUDUS_API_KEY"] = settings.ludus_api_key result = subprocess.run( ["ludus", "ansible", "roles", "list", "--url", settings.ludus_api_url], capture_output=True, text=True, timeout=30, env=env ) if result.returncode == 0: output = result.stdout + result.stderr roles = [] # Parse the table output # Format: | NAME | VERSION | GLOBAL | for line in output.split('\n'): if '|' in line: parts = [p.strip() for p in line.split('|') if p.strip()] # Skip header row and separator rows if len(parts) >= 3 and parts[0] not in ('NAME', '+', '-'): if not parts[0].startswith('-') and not parts[0].startswith('+'): roles.append({ "name": parts[0], "version": parts[1] if len(parts) > 1 else None, "global": parts[2].lower() == "true" if len(parts) > 2 else False, "type": "role", }) return roles except subprocess.TimeoutExpired: logger.warning("Timeout listing roles via CLI, falling back to API") except Exception as e: logger.warning(f"Error listing roles via CLI: {e}, falling back to API") # Fallback to HTTP API try: ansible_resources = await self.client.list_ansible_resources() if isinstance(ansible_resources, dict): roles = ansible_resources.get("roles", []) elif isinstance(ansible_resources, list): roles = ansible_resources else: roles = [] # Normalize role format formatted_roles = [] for role in roles: if isinstance(role, str): formatted_roles.append({"name": role, "type": "role"}) elif isinstance(role, dict): formatted_roles.append({ "name": role.get("Name") or role.get("name", "unknown"), "version": role.get("Version") or role.get("version"), "type": role.get("Type") or role.get("type", "role"), "global": role.get("Global") or role.get("global", False), }) return formatted_roles except Exception as e: logger.warning(f"Error listing roles: {e}") return [] async def check_role_installed(self, role_name: str) -> bool: """Check if a role is installed. Prefers using the ludus CLI with --url for reliable remote checking. Falls back to HTTP API if CLI is not available. The API returns roles as a list of dicts with capital "Name" key: [{"Name": "role-name", "Version": "...", "Type": "role", "Global": False}, ...] """ import shutil import subprocess from ludus_mcp.utils.config import get_settings settings = get_settings() # Prefer ludus CLI if available (works both locally and remotely with --url) ludus_cli = shutil.which("ludus") if ludus_cli and settings.ludus_api_key: try: env = os.environ.copy() env["LUDUS_API_KEY"] = settings.ludus_api_key result = subprocess.run( ["ludus", "ansible", "roles", "list", "--url", settings.ludus_api_url], capture_output=True, text=True, timeout=30, env=env ) if result.returncode == 0: output = result.stdout + result.stderr # Parse the table output - role names are in the first column # Format: | role-name | version | global | for line in output.split('\n'): if '|' in line and role_name in line: # Extract the role name from the table row parts = [p.strip() for p in line.split('|') if p.strip()] if parts and parts[0] == role_name: return True return False except subprocess.TimeoutExpired: logger.warning(f"Timeout checking role {role_name} via CLI, falling back to API") except Exception as e: logger.warning(f"Error checking role via CLI: {e}, falling back to API") # Fallback to HTTP API try: ansible_resources = await self.client.list_ansible_resources() # API returns roles as a list or dict, check both formats if isinstance(ansible_resources, dict): roles = ansible_resources.get("roles", []) elif isinstance(ansible_resources, list): roles = ansible_resources else: roles = [] # Check if role exists (could be string or dict) for role in roles: if isinstance(role, str): if role == role_name: return True elif isinstance(role, dict): # API returns capital "Name" key, but check both for compatibility role_name_in_dict = ( role.get("Name") or # Capital N (actual API format) role.get("name") or # Lowercase n (fallback) role.get("role") # Alternative key ) if role_name_in_dict == role_name: return True return False except Exception as e: logger.warning(f"Error checking role {role_name}: {e}") return False async def clone_role_on_server_via_ssh(self, role_name: str) -> dict[str, Any]: """Clone a role repository directly on the Ludus server via SSH. This method requires SSH access to the Ludus server and will clone the repository to /opt/ludus/roles/<role-name> on the server. Args: role_name: Name of the role (must be in ROLE_REPOSITORIES) Returns: Result dict with status and server path Raises: ValueError: If role_name is not in ROLE_REPOSITORIES or SSH not configured Exception: If SSH command fails """ if not self.allow_ssh_install: raise ValueError( "SSH installation not enabled. Set allow_ssh_install=True and configure SSH credentials." ) if role_name not in self.ROLE_REPOSITORIES: raise ValueError( f"Role {role_name} not in ROLE_REPOSITORIES. " f"Available: {list(self.ROLE_REPOSITORIES.keys())}" ) repo_info = self.ROLE_REPOSITORIES[role_name] if isinstance(repo_info, tuple): repo_url, subdirectory = repo_info else: repo_url = repo_info subdirectory = None server_role_path = f"{self.SERVER_ROLES_PATH}/{role_name}" # Build SSH command ssh_cmd = ["ssh"] # Add SSH key if provided if self.ssh_key_path: ssh_cmd.extend(["-i", self.ssh_key_path]) # Add options to avoid host key checking prompts ssh_cmd.extend([ "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "-o", "ConnectTimeout=10", ]) # Build remote command remote_cmd_parts = [ f"mkdir -p {self.SERVER_ROLES_PATH}", f"cd {self.SERVER_ROLES_PATH}", ] # Check if directory already exists if subdirectory: repo_name = repo_url.split("/")[-1].replace(".git", "") clone_dir = f"{self.SERVER_ROLES_PATH}/{repo_name}" remote_cmd_parts.append( f"if [ ! -d '{clone_dir}' ]; then git clone {repo_url} {repo_name}; fi" ) remote_cmd_parts.append(f"cd {repo_name}") remote_cmd_parts.append(f"git pull || true") # Update if exists else: remote_cmd_parts.append( f"if [ ! -d '{server_role_path}' ]; then git clone {repo_url} {role_name}; else cd {role_name} && git pull; fi" ) remote_cmd = " && ".join(remote_cmd_parts) # Add password authentication if needed (using sshpass) if self.ssh_password and not self.ssh_key_path: # Use sshpass for password authentication ssh_cmd = ["sshpass", "-p", self.ssh_password] + ssh_cmd ssh_cmd.append(f"{self.ssh_user}@{self.ssh_host}") ssh_cmd.append(remote_cmd) logger.info(f"Cloning {role_name} on Ludus server via SSH: {self.ssh_host}") logger.debug(f"SSH command: {' '.join(ssh_cmd[:3])}... {remote_cmd[:100]}...") try: result = subprocess.run( ssh_cmd, check=True, capture_output=True, text=True, timeout=120 ) logger.info(f"Successfully cloned {role_name} on Ludus server to {server_role_path}") return { "status": "success", "role_name": role_name, "server_path": server_role_path, "message": f"Role {role_name} cloned successfully on Ludus server", } except subprocess.TimeoutExpired: raise Exception(f"Timeout cloning {role_name} on Ludus server via SSH") except subprocess.CalledProcessError as e: error_output = e.stderr if e.stderr else e.stdout if e.stdout else "Unknown error" raise Exception( f"Failed to clone {role_name} on Ludus server via SSH: {error_output}" ) async def clone_role_repository(self, role_name: str) -> Path: """Clone a role repository from GitHub. Handles roles that are in subdirectories of repositories. Args: role_name: Name of the role (must be in ROLE_REPOSITORIES) Returns: Path to the role directory (may be a subdirectory of cloned repo) Raises: ValueError: If role_name is not in ROLE_REPOSITORIES Exception: If cloning fails """ if role_name not in self.ROLE_REPOSITORIES: raise ValueError( f"Role {role_name} not in ROLE_REPOSITORIES. " f"Available: {list(self.ROLE_REPOSITORIES.keys())}" ) repo_info = self.ROLE_REPOSITORIES[role_name] if isinstance(repo_info, tuple): repo_url, subdirectory = repo_info else: # Backward compatibility: treat as string URL repo_url = repo_info subdirectory = None # Determine clone directory name (use repo name or role name) if subdirectory: # For roles in subdirectories, clone to repo name, then point to subdirectory repo_name = repo_url.split("/")[-1].replace(".git", "") clone_dir = self.roles_cache_dir / repo_name role_dir = clone_dir / subdirectory else: # For standalone roles, clone directly to role name clone_dir = self.roles_cache_dir / role_name role_dir = clone_dir # Check if already cloned if clone_dir.exists() and (clone_dir / ".git").exists(): logger.info(f"Repository for {role_name} already cloned at {clone_dir}") # Optionally update it try: subprocess.run( ["git", "pull"], cwd=clone_dir, check=True, capture_output=True, timeout=30 ) logger.debug(f"Updated repository for {role_name}") except subprocess.TimeoutExpired: logger.warning(f"Timeout updating repository for {role_name}") except subprocess.CalledProcessError as e: logger.warning(f"Failed to update repository for {role_name}: {e}") # Verify subdirectory exists if needed if subdirectory and not role_dir.exists(): raise Exception( f"Role subdirectory {subdirectory} not found in cloned repository {clone_dir}. " f"Expected path: {role_dir}" ) return role_dir # Clone the repository logger.info(f"Cloning repository for {role_name} from {repo_url} to {clone_dir}") try: subprocess.run( ["git", "clone", repo_url, str(clone_dir)], check=True, capture_output=True, timeout=120 ) logger.info(f"Successfully cloned repository for {role_name} to {clone_dir}") # Verify subdirectory exists if needed if subdirectory: if not role_dir.exists(): raise Exception( f"Role subdirectory {subdirectory} not found in cloned repository. " f"Expected path: {role_dir}. " f"Available directories: {[d.name for d in clone_dir.iterdir() if d.is_dir()]}" ) logger.info(f"Role {role_name} is in subdirectory: {role_dir}") return role_dir except subprocess.TimeoutExpired: raise Exception(f"Timeout cloning repository for {role_name}") except subprocess.CalledProcessError as e: error_msg = e.stderr.decode() if e.stderr else str(e) raise Exception(f"Failed to clone repository for {role_name}: {error_msg}") async def install_role_from_directory(self, role_dir: Path, role_name: str | None = None) -> dict[str, Any]: """Install an Ansible role from a directory on the Ludus server. IMPORTANT: The directory path must exist on the Ludus server, not the client. Roles should be cloned to /opt/ludus/roles/ on the Ludus server. Args: role_dir: Path to the role directory (on Ludus server, e.g., /opt/ludus/roles/ludus-ad-content) role_name: Optional role name (defaults to directory name) Returns: Installation result dict Raises: Exception: If installation fails """ role_name = role_name or role_dir.name logger.info(f"Installing role {role_name} from server directory: {role_dir}") # Use server-side path (absolute path on Ludus server) # The API expects paths on the Ludus server, not the client server_path = str(role_dir) if str(role_dir).startswith("/") else f"{self.SERVER_ROLES_PATH}/{role_dir}" config = { "action": "install", "name": role_name, "directory": server_path, } try: result = await self.client.install_ansible_role(config) logger.info(f"[OK] Successfully installed role {role_name} from directory: {server_path}") return result except Exception as e: error_msg = str(e) logger.warning(f"API installation failed for {role_name}: {error_msg}") logger.info(f"Role may need to be cloned on Ludus server first:") logger.info(f" cd {self.SERVER_ROLES_PATH}") logger.info(f" git clone {self.ROLE_REPOSITORIES.get(role_name, ('<repository-url>', None))[0]}") logger.info(f" ludus ansible role add -d {server_path}") # Return a result indicating manual installation is needed return { "status": "manual_installation_required", "role_name": role_name, "directory": server_path, "command": f"ludus ansible role add -d {server_path}", "note": f"Clone repository on Ludus server to {server_path} first, then install" } async def install_role( self, role_name: str, role_url: str | None = None, max_retries: int = 3 ) -> dict[str, Any]: """ Install an Ansible role with retry logic. If the role is in ROLE_REPOSITORIES, it will be cloned and installed from local directory. Otherwise, it will be installed from Ansible Galaxy. Args: role_name: Name of the role to install role_url: Optional URL to install from (overrides repository lookup) max_retries: Maximum number of retry attempts (default: 3) Returns: Installation result dict Raises: Exception: If all retry attempts fail """ # Check if this role needs to be installed from a directory on the Ludus server if role_name in self.ROLE_REPOSITORIES and not role_url: logger.info(f"Role {role_name} must be installed from directory on Ludus server") # Use server-side path (assumes role is cloned to /opt/ludus/roles/<role-name>) server_role_path = Path(f"{self.SERVER_ROLES_PATH}/{role_name}") # Try to install from server directory # Note: The directory must already exist on the Ludus server # If it doesn't exist, try SSH installation if enabled try: result = await self.install_role_from_directory(server_role_path, role_name) if result.get("status") == "manual_installation_required": # Directory doesn't exist on server # Try SSH installation if enabled if self.allow_ssh_install: logger.info(f"Attempting to clone {role_name} on Ludus server via SSH...") try: clone_result = await self.clone_role_on_server_via_ssh(role_name) logger.info(f"Successfully cloned {role_name} on server, retrying installation...") # Retry installation now that directory exists result = await self.install_role_from_directory(server_role_path, role_name) if result.get("status") == "manual_installation_required": # Still failed, provide instructions repo_url = self.ROLE_REPOSITORIES[role_name][0] raise Exception( f"Role {role_name} cloned on server but installation failed. " f"Try manually: ludus ansible role add -d {self.SERVER_ROLES_PATH}/{role_name}" ) return result except Exception as ssh_error: logger.warning(f"SSH installation failed: {ssh_error}") # Fall through to manual instructions # Provide manual installation instructions repo_url = self.ROLE_REPOSITORIES[role_name][0] raise Exception( f"Role {role_name} not found on Ludus server. " f"Clone it first: cd {self.SERVER_ROLES_PATH} && git clone {repo_url} && " f"ludus ansible role add -d {self.SERVER_ROLES_PATH}/{role_name}" ) return result except Exception as e: error_msg = str(e) # If it's already our formatted error, re-raise it if "not found on Ludus server" in error_msg or "Clone it first" in error_msg: raise # Otherwise, provide helpful error message repo_url = self.ROLE_REPOSITORIES[role_name][0] raise Exception( f"Failed to install {role_name} from directory. " f"Ensure it's cloned on Ludus server: cd {self.SERVER_ROLES_PATH} && git clone {repo_url}" ) from e # Standard installation from Ansible Galaxy logger.info(f"Installing Ludus role from Ansible Galaxy: {role_name}") config = { "action": "install", "name": role_name, } if role_url: config["url"] = role_url last_error = None for attempt in range(max_retries): try: logger.debug(f"Install attempt {attempt + 1}/{max_retries} for role: {role_name}") result = await self.client.install_ansible_role(config) logger.info(f"Successfully installed role: {role_name}") return result except Exception as e: last_error = e error_msg = str(e) logger.warning( f"Failed to install role {role_name} (attempt {attempt + 1}/{max_retries}): {error_msg}" ) # Don't retry if it's a 404 (role not found) or similar permanent errors if "404" in error_msg or "not found" in error_msg.lower(): logger.error(f"Role {role_name} not found in repository. Cannot retry.") raise # Wait before retrying (exponential backoff) if attempt < max_retries - 1: wait_time = 2 ** attempt # 1s, 2s, 4s logger.debug(f"Waiting {wait_time}s before retry...") await asyncio.sleep(wait_time) # All retries failed logger.error(f"Failed to install role {role_name} after {max_retries} attempts") raise Exception( f"Failed to install role {role_name} after {max_retries} attempts. " f"Last error: {last_error}" ) async def ensure_roles_for_scenario( self, scenario_key: str, auto_install: bool = True, siem_type: str = "wazuh" ) -> dict[str, Any]: """Ensure required roles are installed for a scenario.""" logger.info(f"Checking roles for scenario: {scenario_key} with SIEM: {siem_type}") # Determine required roles based on scenario type required_roles = [] if scenario_key.startswith("goad"): required_roles.extend(self.REQUIRED_ROLES.get("goad", [])) elif (scenario_key.startswith("ad-") or scenario_key.startswith("purple-") or scenario_key.startswith("red-vs-blue") or scenario_key.startswith("redteam-lab") or scenario_key.startswith("blueteam-lab")): # Determine AD role set based on scenario level if scenario_key.endswith("-intermediate"): required_roles.extend(self.REQUIRED_ROLES.get("ad-intermediate", [])) elif scenario_key.endswith("-advanced"): required_roles.extend(self.REQUIRED_ROLES.get("ad-advanced", [])) else: required_roles.extend(self.REQUIRED_ROLES.get("ad", [])) # Check if scenario needs SQL Server (intermediate/advanced red team scenarios) if scenario_key.startswith("redteam-lab") and ( scenario_key.endswith("-intermediate") or scenario_key.endswith("-advanced") ): required_roles.extend(self.REQUIRED_ROLES.get("sql", [])) if scenario_key.startswith("web-") or scenario_key in ["dvwa", "owasp-top10"]: required_roles.extend(self.REQUIRED_ROLES.get("web", [])) if scenario_key.startswith("network-") or scenario_key == "wireless": required_roles.extend(self.REQUIRED_ROLES.get("network", [])) if scenario_key.startswith("purple-") or scenario_key == "red-vs-blue": required_roles.extend(self.REQUIRED_ROLES.get("purple-team", [])) # Add SIEM-specific roles if siem_type == "wazuh": required_roles.extend(self.REQUIRED_ROLES.get("wazuh", [])) elif siem_type == "splunk": required_roles.extend(self.REQUIRED_ROLES.get("splunk", [])) elif siem_type == "elastic": required_roles.extend(self.REQUIRED_ROLES.get("elastic", [])) elif siem_type == "security-onion": required_roles.extend(self.REQUIRED_ROLES.get("security-onion", [])) # Remove duplicates required_roles = list(set(required_roles)) if not required_roles: logger.info("No specific roles required for this scenario") return {"status": "no_roles_required", "roles": []} results = { "status": "checked", "required_roles": required_roles, "installed": [], "missing": [], "install_attempted": [], "install_failed": [], } for role_name in required_roles: is_installed = await self.check_role_installed(role_name) if is_installed: logger.info(f"[OK] Role {role_name} is already installed") results["installed"].append(role_name) else: logger.warning(f"Role {role_name} is NOT installed") results["missing"].append(role_name) if auto_install: try: logger.info(f"Attempting to install role: {role_name}") install_result = await self.install_role(role_name) results["install_attempted"].append(role_name) # Check if installation was successful if install_result.get("status") == "manual_installation_required": # Directory-based role needs manual setup error_msg = install_result.get("note", "Manual installation required") logger.warning(f"Role {role_name} requires manual setup: {error_msg}") results["install_failed"].append({ "role": role_name, "error": error_msg, "command": install_result.get("command", "") }) else: # Verify installation succeeded is_now_installed = await self.check_role_installed(role_name) if is_now_installed: logger.info(f"[OK] Successfully installed role: {role_name}") results["installed"].append(role_name) if role_name in results["missing"]: results["missing"].remove(role_name) else: logger.warning(f"Role {role_name} installation reported success but not found in installed roles") results["install_failed"].append({ "role": role_name, "error": "Installation reported success but role not found" }) except Exception as e: error_msg = str(e) logger.error(f"Failed to install role {role_name}: {error_msg}") results["install_failed"].append({"role": role_name, "error": error_msg}) if results["missing"] and not auto_install: logger.warning( f"Missing roles: {results['missing']}. " f"Set auto_install=True to install automatically." ) return results

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server