Skip to main content
Glama
service_installer.py57.1 kB
"""Service installation framework for homelab applications.""" import json import subprocess from pathlib import Path from typing import Any import yaml from .ssh_tools import ssh_execute_command # Service templates directory TEMPLATES_DIR = Path(__file__).parent / "service_templates" class AnsibleRunner: """Runner for executing Ansible playbooks.""" def __init__( self, playbook_path: str, inventory_path: str, variables: dict | None = None ): self.playbook_path = playbook_path self.inventory_path = inventory_path self.variables = variables or {} self.results: dict[str, Any] = {} async def run_playbook(self, extra_vars: dict | None = None) -> dict[str, Any]: """Execute the Ansible playbook.""" cmd = ["ansible-playbook", "-i", self.inventory_path, self.playbook_path] # Add extra variables all_vars = {**self.variables, **(extra_vars or {})} if all_vars: vars_json = json.dumps(all_vars) cmd.extend(["--extra-vars", vars_json]) try: result = subprocess.run(cmd, capture_output=True, text=True, check=False) self.results = { "return_code": result.returncode, "stdout": result.stdout, "stderr": result.stderr, "success": result.returncode == 0, } return self.results except Exception as e: self.results = { "return_code": -1, "stdout": "", "stderr": str(e), "success": False, } return self.results def generate_ansible_inventory( hostname: str, username: str, config: dict | None = None ) -> str: """Generate Ansible inventory content.""" inventory = f"""[servers] {hostname} ansible_user={username} ansible_ssh_private_key_file=~/.ssh/mcp_admin_rsa [servers:vars] ansible_python_interpreter=/usr/bin/python3 ansible_ssh_common_args='-o StrictHostKeyChecking=no' """ if config and "additional_vars" in config: for key, value in config["additional_vars"].items(): inventory += f"{key}={value}\n" return inventory def render_template(template_content: str, variables: dict[str, Any]) -> str: """Simple template rendering with variable substitution.""" result = template_content for key, value in variables.items(): placeholder = f"{{{{{key}}}}}" result = result.replace(placeholder, str(value)) return result class ServiceInstaller: """Framework for installing and managing homelab services.""" def __init__(self) -> None: self.templates = self._load_service_templates() def _load_service_templates(self) -> dict[str, dict[str, Any]]: """Load all service templates from the templates directory.""" templates = {} # Ensure templates directory exists TEMPLATES_DIR.mkdir(exist_ok=True) # Load YAML template files for template_file in TEMPLATES_DIR.glob("*.yaml"): try: with open(template_file) as f: service_data = yaml.safe_load(f) service_name = template_file.stem templates[service_name] = service_data except Exception as e: print(f"Warning: Failed to load template {template_file}: {e}") return templates def get_available_services(self) -> list[str]: """Get list of available service templates.""" return list(self.templates.keys()) def get_service_info(self, service_name: str) -> dict | None: """Get detailed information about a service.""" return self.templates.get(service_name) async def check_service_requirements( self, service_name: str, hostname: str, username: str = "mcp_admin", password: str | None = None, ) -> dict[str, Any]: """Check if a device meets the requirements for a service.""" if service_name not in self.templates: return {"status": "error", "error": f"Unknown service: {service_name}"} service = self.templates[service_name] requirements = service.get("requirements", {}) results: dict[str, Any] = { "service": service_name, "hostname": hostname, "requirements_met": True, "checks": {}, } # Check available ports if "ports" in requirements: for port in requirements["ports"]: # Check if port is available cmd = f"ss -tlnp | grep :{port}" port_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=cmd ) port_data = json.loads(port_result) port_available = ( port_data.get("exit_code", 1) != 0 ) # Port is free if command fails checks_dict = results.get("checks", {}) if isinstance(checks_dict, dict): checks_dict[f"port_{port}"] = { "required": True, "available": port_available, "status": "pass" if port_available else "fail", } if not port_available: results["requirements_met"] = False # Check available memory if "memory_gb" in requirements: cmd = "free -m | grep '^Mem:' | awk '{print $2}'" mem_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=cmd ) mem_data = json.loads(mem_result) if mem_data.get("exit_code") == 0: available_mb = int(mem_data.get("output", "0").split("Output:\n")[-1]) required_mb = requirements["memory_gb"] * 1024 memory_ok = available_mb >= required_mb checks_dict = results.get("checks", {}) if isinstance(checks_dict, dict): checks_dict["memory"] = { "required_mb": required_mb, "available_mb": available_mb, "status": "pass" if memory_ok else "fail", } if not memory_ok: results["requirements_met"] = False # Check disk space if "disk_gb" in requirements: cmd = "df / | tail -1 | awk '{print $4}'" disk_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=cmd ) disk_data = json.loads(disk_result) if disk_data.get("exit_code") == 0: available_kb = int(disk_data.get("output", "0").split("Output:\n")[-1]) required_kb = requirements["disk_gb"] * 1024 * 1024 disk_ok = available_kb >= required_kb checks_dict = results.get("checks", {}) if isinstance(checks_dict, dict): checks_dict["disk_space"] = { "required_gb": requirements["disk_gb"], "available_gb": round(available_kb / 1024 / 1024, 2), "status": "pass" if disk_ok else "fail", } if not disk_ok: results["requirements_met"] = False return results async def install_service( self, service_name: str, hostname: str, username: str = "mcp_admin", password: str | None = None, config_override: dict | None = None, ) -> dict[str, Any]: """Install a service on the target device.""" if service_name not in self.templates: return {"status": "error", "error": f"Unknown service: {service_name}"} service = self.templates[service_name] # Check requirements first req_check = await self.check_service_requirements( service_name, hostname, username, password ) if not req_check["requirements_met"]: return { "status": "error", "error": "Requirements not met", "requirement_check": req_check, } # Get installation method install_method = service.get("installation", {}).get("method", "docker-compose") if install_method == "docker-compose": return await self._install_docker_compose_service( service_name, service, hostname, username, password, config_override ) elif install_method == "script": return await self._install_script_service( service_name, service, hostname, username, password, config_override ) elif install_method == "terraform": return await self._install_terraform_service( service_name, service, hostname, username, password, config_override ) elif install_method == "ansible": return await self._install_ansible_service( service_name, service, hostname, username, password, config_override ) elif install_method == "iso_installation": return await self._install_iso_service( service_name, service, hostname, username, password, config_override ) else: return { "status": "error", "error": f"Unsupported installation method: {install_method}", } async def _install_docker_compose_service( self, service_name: str, service: dict, hostname: str, username: str, password: str | None, config_override: dict | None, ) -> dict[str, Any]: """Install a service using Docker Compose.""" results = { "service": service_name, "hostname": hostname, "installation_method": "docker-compose", "steps": [], } try: # Step 1: Ensure Docker is installed docker_check = await ssh_execute_command( hostname=hostname, username=username, password=password, command="docker --version", ) docker_data = json.loads(docker_check) if docker_data.get("exit_code") != 0: step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( { "step": "check_docker", "status": "fail", "error": "Docker not installed", } ) return {"status": "error", "results": results} step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append({"step": "check_docker", "status": "success"}) # Step 2: Ensure Docker Compose is available compose_check = await ssh_execute_command( hostname=hostname, username=username, password=password, command="docker compose version", ) compose_data = json.loads(compose_check) if compose_data.get("exit_code") != 0: step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( { "step": "check_docker_compose", "status": "fail", "error": "Docker Compose not available", } ) return {"status": "error", "results": results} step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append({"step": "check_docker_compose", "status": "success"}) # Step 3: Create service directory service_dir = f"/opt/{service_name}" mkdir_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"sudo mkdir -p {service_dir}", sudo=False, # sudo is included in command ) mkdir_data = json.loads(mkdir_result) if mkdir_data.get("exit_code") != 0: step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( { "step": "create_directory", "status": "fail", "error": f"Failed to create {service_dir}", } ) return {"status": "error", "results": results} step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( { "step": "create_directory", "status": "success", "directory": service_dir, } ) # Step 4: Generate docker-compose.yml compose_content = service["installation"]["docker_compose"] # Apply configuration overrides if config_override: # Simple merge for now - could be more sophisticated compose_content = self._merge_config(compose_content, config_override) # Write docker-compose.yml to remote system compose_yaml = yaml.dump(compose_content, default_flow_style=False) write_compose = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"sudo tee {service_dir}/docker-compose.yml > /dev/null << 'EOF'\n{compose_yaml}\nEOF", ) write_data = json.loads(write_compose) if write_data.get("exit_code") != 0: step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( { "step": "write_compose_file", "status": "fail", "error": "Failed to write docker-compose.yml", } ) return {"status": "error", "results": results} step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append({"step": "write_compose_file", "status": "success"}) # Step 5: Start the service start_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"cd {service_dir} && sudo docker compose up -d", ) start_data = json.loads(start_result) if start_data.get("exit_code") != 0: step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( { "step": "start_service", "status": "fail", "error": f"Failed to start service: {start_data.get('output', '')}", } ) return {"status": "error", "results": results} step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( { "step": "start_service", "status": "success", "output": start_data.get("output", ""), } ) # Step 6: Verify service is running status_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"cd {service_dir} && sudo docker compose ps", ) status_data = json.loads(status_result) step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( { "step": "verify_service", "status": "success" if status_data.get("exit_code") == 0 else "warning", "container_status": status_data.get("output", ""), } ) return { "status": "success", "service": service_name, "hostname": hostname, "access_url": f"http://{hostname}:{service.get('default_port', 8080)}", "installation_directory": service_dir, "results": results, } except Exception as e: step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( {"step": "exception", "status": "fail", "error": str(e)} ) return {"status": "error", "results": results} async def _install_script_service( self, service_name: str, service: dict, hostname: str, username: str, password: str | None, config_override: dict | None, ) -> dict[str, Any]: """Install a service using shell scripts.""" # TODO: Implement script-based installation return { "status": "error", "error": "Script-based installation not yet implemented", } def _merge_config(self, base_config: dict, override: dict) -> dict: """Merge configuration override with base configuration.""" # Simple recursive merge - could be enhanced result = base_config.copy() for key, value in override.items(): if ( key in result and isinstance(result[key], dict) and isinstance(value, dict) ): result[key] = self._merge_config(result[key], value) else: result[key] = value return result async def get_service_status( self, service_name: str, hostname: str, username: str = "mcp_admin", password: str | None = None, ) -> dict[str, Any]: """Get the current status of an installed service.""" service_dir = f"/opt/{service_name}" # Check if service directory exists dir_check = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"test -d {service_dir}", ) dir_data = json.loads(dir_check) if dir_data.get("exit_code") != 0: return { "status": "not_installed", "service": service_name, "hostname": hostname, } # Check container status status_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"cd {service_dir} && sudo docker compose ps --format json", ) status_data = json.loads(status_result) return { "status": "installed", "service": service_name, "hostname": hostname, "container_status": status_data.get("output", ""), "service_directory": service_dir, } async def _install_terraform_service( self, service_name: str, service: dict, hostname: str, username: str, password: str | None, config_override: dict | None, ) -> dict[str, Any]: """Install a service using Terraform.""" results = { "service": service_name, "hostname": hostname, "installation_method": "terraform", "steps": [], } try: # Step 1: Check if Terraform is installed tf_check = await ssh_execute_command( hostname=hostname, username=username, password=password, command="terraform version", ) tf_data = json.loads(tf_check) if tf_data.get("exit_code") != 0: # Install Terraform install_cmd = """ wget -O- https://apt.releases.hashicorp.com/gpg | gpg --dearmor | sudo tee /usr/share/keyrings/hashicorp-archive-keyring.gpg echo "deb [signed-by=/usr/share/keyrings/hashicorp-archive-keyring.gpg] https://apt.releases.hashicorp.com $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/hashicorp.list sudo apt update && sudo apt install -y terraform """ install_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=install_cmd, sudo=True, ) install_data = json.loads(install_result) if install_data.get("exit_code") != 0: step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( { "step": "install_terraform", "status": "fail", "error": "Failed to install Terraform", } ) return {"status": "error", "results": results} step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append({"step": "check_terraform", "status": "success"}) # Step 2: Create Terraform workspace tf_dir = f"/opt/terraform/{service_name}" mkdir_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"sudo mkdir -p {tf_dir} && sudo chown {username}:{username} {tf_dir}", sudo=True, ) step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( { "step": "create_workspace", "status": "success" if json.loads(mkdir_result).get("exit_code") == 0 else "fail", } ) # Step 3: Generate Terraform files tf_config = service["installation"]["terraform"] # Generate main.tf main_tf_content = tf_config["main_tf"] # Replace template variables main_tf_content = main_tf_content.replace("{{hostname}}", hostname) main_tf_content = main_tf_content.replace("{{service_name}}", service_name) # Write main.tf await self._write_remote_file( hostname, username, password, f"{tf_dir}/main.tf", main_tf_content ) # Generate variables.tf if variables are defined if "variables" in tf_config: variables_tf = self._generate_variables_tf(tf_config["variables"]) await self._write_remote_file( hostname, username, password, f"{tf_dir}/variables.tf", variables_tf ) # Generate terraform.tfvars tfvars = self._generate_tfvars( tf_config["variables"], config_override, hostname, username, password, ) await self._write_remote_file( hostname, username, password, f"{tf_dir}/terraform.tfvars", tfvars ) # Generate backend configuration if "backend" in tf_config: backend_tf = self._generate_backend_tf( tf_config["backend"], service_name, hostname ) await self._write_remote_file( hostname, username, password, f"{tf_dir}/backend.tf", backend_tf ) step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( {"step": "generate_terraform_files", "status": "success"} ) # Step 4: Terraform init init_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"cd {tf_dir} && terraform init -upgrade", ) init_data = json.loads(init_result) step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( { "step": "terraform_init", "status": "success" if init_data.get("exit_code") == 0 else "fail", "output": init_data.get("output", ""), } ) if init_data.get("exit_code") != 0: return {"status": "error", "results": results} # Step 5: Terraform plan plan_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"cd {tf_dir} && terraform plan -out=tfplan", ) plan_data = json.loads(plan_result) step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( { "step": "terraform_plan", "status": "success" if plan_data.get("exit_code") == 0 else "fail", } ) if plan_data.get("exit_code") != 0: return {"status": "error", "results": results} # Step 6: Terraform apply apply_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"cd {tf_dir} && terraform apply -auto-approve tfplan", ) apply_data = json.loads(apply_result) step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( { "step": "terraform_apply", "status": "success" if apply_data.get("exit_code") == 0 else "fail", } ) if apply_data.get("exit_code") != 0: return {"status": "error", "results": results} # Step 7: Get outputs outputs_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"cd {tf_dir} && terraform output -json", ) outputs_data = json.loads(outputs_result) if outputs_data.get("exit_code") == 0: try: outputs = json.loads(outputs_data.get("output", "{}")) results["terraform_outputs"] = outputs except json.JSONDecodeError: pass # Step 8: Save state backup if configured if ( tf_config.get("state_management", {}) .get("backup", {}) .get("enabled", True) ): backup_result = await self._backup_terraform_state( hostname, username, password, f"{tf_dir}/terraform.tfstate", service_name, ) step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( { "step": "backup_state", "status": "success" if backup_result else "fail", } ) return { "status": "success", "service": service_name, "hostname": hostname, "terraform_directory": tf_dir, "outputs": results.get("terraform_outputs", {}), "results": results, } except Exception as e: step_list = results.setdefault("steps", []) if isinstance(step_list, list): step_list.append( {"step": "exception", "status": "fail", "error": str(e)} ) return {"status": "error", "results": results} async def _write_remote_file( self, hostname: str, username: str, password: str | None, remote_path: str, content: str, ) -> bool: """Write content to a remote file.""" # Escape single quotes in content escaped_content = content.replace("'", "'\"'\"'") # Use printf to avoid issues with special characters write_cmd = f"printf '%s' '{escaped_content}' > {remote_path}" result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=write_cmd ) data = json.loads(result) return bool(data.get("exit_code") == 0) def _generate_variables_tf(self, variables: dict) -> str: """Generate variables.tf file from template variables.""" content = [] for var_name, var_config in variables.items(): var_block = f'variable "{var_name}" {{\n' if "type" in var_config: var_block += f" type = {var_config['type']}\n" if "default" in var_config: default_val = var_config["default"] if isinstance(default_val, str): var_block += f' default = "{default_val}"\n' elif isinstance(default_val, bool): var_block += f" default = {str(default_val).lower()}\n" elif isinstance(default_val, list): var_block += f" default = {json.dumps(default_val)}\n" else: var_block += f" default = {default_val}\n" if "description" in var_config: var_block += f' description = "{var_config["description"]}"\n' if "sensitive" in var_config and var_config["sensitive"]: var_block += " sensitive = true\n" if "validation" in var_config: var_block += " validation {\n" validation_values = ", ".join( [f'"{v}"' for v in var_config["validation"]] ) var_block += ( f" condition = contains([{validation_values}], var.{var_name})\n" ) var_block += f' error_message = "Invalid value for {var_name}. Must be one of: {", ".join(var_config["validation"])}"\n' var_block += " }\n" var_block += "}\n" content.append(var_block) return "\n".join(content) def _generate_tfvars( self, variables: dict, overrides: dict | None, hostname: str, username: str, password: str | None, ) -> str: """Generate terraform.tfvars file with actual values.""" tfvars = {} # Process template variables for var_name, var_config in variables.items(): if "default" in var_config: default = var_config["default"] # Replace template variables if isinstance(default, str): default = default.replace("{{hostname}}", hostname) default = default.replace( "{{service_name}}", self.service_name if hasattr(self, "service_name") else "", ) tfvars[var_name] = default # Add SSH credentials for Terraform to use tfvars["ssh_user"] = username if password: tfvars["ssh_password"] = password # Apply overrides if overrides: tfvars.update(overrides) # Generate tfvars content content = [] for key, value in tfvars.items(): if isinstance(value, str): content.append(f'{key} = "{value}"') elif isinstance(value, bool): content.append(f"{key} = {str(value).lower()}") elif isinstance(value, list): content.append(f"{key} = {json.dumps(value)}") else: content.append(f"{key} = {value}") return "\n".join(content) def _generate_backend_tf( self, backend_config: dict, service_name: str, hostname: str ) -> str: """Generate backend.tf for state management.""" backend_type = backend_config.get("type", "local") if backend_type == "local": path = backend_config.get( "path", f"/opt/terraform-states/{service_name}-{hostname}.tfstate" ) # Replace template variables path = path.replace("{{service_name}}", service_name) path = path.replace("{{hostname}}", hostname) return f"""terraform {{ backend "local" {{ path = "{path}" }} }}""" elif backend_type == "s3": config = backend_config.get("config", {}) bucket = config.get("bucket", "terraform-states") key = config.get( "key", f"services/{service_name}/{hostname}/terraform.tfstate" ) # Replace template variables key = key.replace("{{service_name}}", service_name) key = key.replace("{{hostname}}", hostname) return f"""terraform {{ backend "s3" {{ bucket = "{bucket}" key = "{key}" region = "{config.get("region", "us-east-1")}" encrypt = {str(config.get("encrypt", True)).lower()} }} }}""" else: # Default to local backend return f"""terraform {{ backend "local" {{ path = "/opt/terraform-states/{service_name}-{hostname}.tfstate" }} }}""" async def _backup_terraform_state( self, hostname: str, username: str, password: str | None, state_path: str, service_name: str, ) -> bool: """Backup Terraform state file.""" backup_dir = "/opt/terraform-state-backups" timestamp = "$(date +%Y%m%d-%H%M%S)" backup_path = f"{backup_dir}/{service_name}-{timestamp}.tfstate" # Create backup directory mkdir_cmd = f"sudo mkdir -p {backup_dir} && sudo chown {username}:{username} {backup_dir}" await ssh_execute_command( hostname=hostname, username=username, password=password, command=mkdir_cmd, sudo=True, ) # Copy state file to backup backup_cmd = f"cp {state_path} {backup_path}" result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=backup_cmd ) data = json.loads(result) return bool(data.get("exit_code") == 0) async def destroy_terraform_service( self, service_name: str, hostname: str, username: str = "mcp_admin", password: str | None = None, ) -> dict[str, Any]: """Destroy a Terraform-managed service.""" tf_dir = f"/opt/terraform/{service_name}" # Check if Terraform directory exists dir_check = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"test -d {tf_dir}", ) dir_data = json.loads(dir_check) if dir_data.get("exit_code") != 0: return { "status": "error", "error": f"Terraform directory not found: {tf_dir}", } # Run terraform destroy destroy_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"cd {tf_dir} && terraform destroy -auto-approve", ) destroy_data = json.loads(destroy_result) # Clean up Terraform directory if destroy succeeded if destroy_data.get("exit_code") == 0: await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"sudo rm -rf {tf_dir}", sudo=True, ) return { "status": "success" if destroy_data.get("exit_code") == 0 else "error", "service": service_name, "action": "destroy", "output": destroy_data.get("output", ""), } async def plan_terraform_service( self, service_name: str, hostname: str, username: str = "mcp_admin", password: str | None = None, config_override: dict | None = None, ) -> dict[str, Any]: """Generate a Terraform plan without applying changes.""" if service_name not in self.templates: return {"status": "error", "error": f"Unknown service: {service_name}"} self.templates[service_name] tf_dir = f"/opt/terraform/{service_name}" # Check if already installed dir_check = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"test -d {tf_dir}", ) dir_data = json.loads(dir_check) if dir_data.get("exit_code") == 0: # Service exists, just run plan plan_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"cd {tf_dir} && terraform plan", ) plan_data = json.loads(plan_result) return { "status": "success" if plan_data.get("exit_code") == 0 else "error", "service": service_name, "action": "plan", "existing": True, "output": plan_data.get("output", ""), } else: # Service doesn't exist, need to set up first return { "status": "info", "service": service_name, "action": "plan", "existing": False, "message": "Service not installed. Run install_service first to see the plan.", } async def refresh_terraform_service( self, service_name: str, hostname: str, username: str = "mcp_admin", password: str | None = None, ) -> dict[str, Any]: """Refresh Terraform state and detect drift.""" tf_dir = f"/opt/terraform/{service_name}" # Check if Terraform directory exists dir_check = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"test -d {tf_dir}", ) dir_data = json.loads(dir_check) if dir_data.get("exit_code") != 0: return {"status": "error", "error": f"Service not found: {service_name}"} # Run terraform refresh refresh_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"cd {tf_dir} && terraform refresh", ) refresh_data = json.loads(refresh_result) # Check for drift with plan plan_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"cd {tf_dir} && terraform plan -detailed-exitcode", ) plan_data = json.loads(plan_result) # Exit code 2 means there are changes has_changes = plan_data.get("exit_code") == 2 return { "status": "drift_detected" if has_changes else "in_sync", "service": service_name, "has_changes": has_changes, "refresh_output": refresh_data.get("output", ""), "plan_output": plan_data.get("output", "") if has_changes else None, } async def _install_ansible_service( self, service_name: str, service: dict, hostname: str, username: str, password: str | None, config_override: dict | None, ) -> dict[str, Any]: """Install a service using Ansible playbook.""" ansible_config = service.get("installation", {}).get("ansible", {}) if not ansible_config: return { "status": "error", "error": "No Ansible configuration found in service template", } # Create Ansible directory structure ansible_dir = f"/opt/ansible/{service_name}" setup_cmd = f""" sudo mkdir -p {ansible_dir}/playbooks {ansible_dir}/inventory {ansible_dir}/group_vars {ansible_dir}/host_vars && sudo chown -R {username}:{username} {ansible_dir} """ setup_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=setup_cmd, sudo=True, ) setup_data = json.loads(setup_result) if setup_data.get("exit_code") != 0: return { "status": "error", "error": "Failed to create Ansible directory structure", "output": setup_data.get("output", ""), } # Generate inventory file inventory_content = self._generate_ansible_inventory( hostname, username, config_override ) inventory_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"cat > {ansible_dir}/inventory/hosts << 'EOF'\n{inventory_content}\nEOF", ) inventory_data = json.loads(inventory_result) if inventory_data.get("exit_code") != 0: return { "status": "error", "error": "Failed to create inventory file", "output": inventory_data.get("output", ""), } # Generate playbook playbook_content = self._generate_ansible_playbook( service, service_name, config_override ) playbook_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"cat > {ansible_dir}/playbooks/{service_name}.yml << 'EOF'\n{playbook_content}\nEOF", ) playbook_data = json.loads(playbook_result) if playbook_data.get("exit_code") != 0: return { "status": "error", "error": "Failed to create playbook", "output": playbook_data.get("output", ""), } # Generate group variables if config_override or service.get("default_config"): vars_content = self._generate_ansible_vars(service, config_override) await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"cat > {ansible_dir}/group_vars/all.yml << 'EOF'\n{vars_content}\nEOF", ) # Install Ansible if not present ansible_check = await ssh_execute_command( hostname=hostname, username=username, password=password, command="which ansible-playbook", ) ansible_data = json.loads(ansible_check) if ansible_data.get("exit_code") != 0: # Install Ansible install_cmd = """ if command -v apt-get >/dev/null 2>&1; then sudo apt-get update && sudo apt-get install -y ansible elif command -v yum >/dev/null 2>&1; then sudo yum install -y epel-release && sudo yum install -y ansible elif command -v dnf >/dev/null 2>&1; then sudo dnf install -y ansible elif command -v pacman >/dev/null 2>&1; then sudo pacman -S --noconfirm ansible else echo "Unsupported package manager for Ansible installation" exit 1 fi """ install_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=install_cmd, sudo=True, ) install_data = json.loads(install_result) if install_data.get("exit_code") != 0: return { "status": "error", "error": "Failed to install Ansible", "output": install_data.get("output", ""), } # Run the playbook run_cmd = f"cd {ansible_dir} && ansible-playbook -i inventory/hosts playbooks/{service_name}.yml" # Add verbose output if in debug mode if config_override and config_override.get("debug", False): run_cmd += " -vvv" run_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=run_cmd ) run_data = json.loads(run_result) return { "status": "success" if run_data.get("exit_code") == 0 else "error", "service": service_name, "method": "ansible", "output": run_data.get("output", ""), "ansible_dir": ansible_dir, "playbook_path": f"{ansible_dir}/playbooks/{service_name}.yml", } def _generate_ansible_inventory( self, hostname: str, username: str, config_override: dict | None ) -> str: """Generate Ansible inventory file.""" inventory = f"""[homelab] {hostname} ansible_user={username} ansible_ssh_private_key_file=~/.ssh/mcp_admin_rsa [homelab:vars] ansible_python_interpreter=/usr/bin/python3 ansible_ssh_common_args='-o StrictHostKeyChecking=no' """ # Add any custom inventory variables if config_override and "inventory_vars" in config_override: inventory += "\n" for key, value in config_override["inventory_vars"].items(): inventory += f"{key}={value}\n" return inventory def _generate_ansible_playbook( self, service: dict, service_name: str, config_override: dict | None ) -> str: """Generate Ansible playbook from service template.""" ansible_config = service.get("installation", {}).get("ansible", {}) # Base playbook structure playbook = f"""--- - name: Deploy {service.get("name", service_name)} hosts: homelab become: yes gather_facts: yes vars: service_name: {service_name} service_description: "{service.get("description", "")}" tasks: """ # Add pre-tasks if defined if "pre_tasks" in ansible_config: playbook += " # Pre-installation tasks\n" for task in ansible_config["pre_tasks"]: playbook += self._format_ansible_task(task, 4) # Add main tasks if "tasks" in ansible_config: playbook += " # Main installation tasks\n" for task in ansible_config["tasks"]: playbook += self._format_ansible_task(task, 4) # Add post-tasks if defined if "post_tasks" in ansible_config: playbook += " # Post-installation tasks\n" for task in ansible_config["post_tasks"]: playbook += self._format_ansible_task(task, 4) # Add handlers if defined if "handlers" in ansible_config: playbook += "\n handlers:\n" for handler in ansible_config["handlers"]: playbook += self._format_ansible_task(handler, 4) return playbook def _format_ansible_task(self, task: dict, indent: int) -> str: """Format an Ansible task with proper indentation.""" spaces = " " * indent task_str = f"{spaces}- name: {task.get('name', 'Unnamed task')}\n" for key, value in task.items(): if key == "name": continue if isinstance(value, dict): task_str += f"{spaces} {key}:\n" for k, v in value.items(): task_str += f"{spaces} {k}: {v}\n" elif isinstance(value, list): task_str += f"{spaces} {key}:\n" for item in value: task_str += f"{spaces} - {item}\n" else: task_str += f"{spaces} {key}: {value}\n" return task_str + "\n" def _generate_ansible_vars( self, service: dict, config_override: dict | None ) -> str: """Generate Ansible variables file.""" vars_content = "---\n# Service configuration variables\n\n" # Add default configuration default_config = service.get("default_config", {}) for key, value in default_config.items(): vars_content += f"{key}: {yaml.dump(value).strip()}\n" # Add configuration overrides if config_override: vars_content += "\n# Configuration overrides\n" for key, value in config_override.items(): if key not in ["debug", "inventory_vars"]: # Skip special keys vars_content += f"{key}: {yaml.dump(value).strip()}\n" return vars_content async def check_ansible_service( self, service_name: str, hostname: str, username: str = "mcp_admin", password: str | None = None, ) -> dict[str, Any]: """Check the status of an Ansible-managed service.""" ansible_dir = f"/opt/ansible/{service_name}" # Check if Ansible directory exists dir_check = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"test -d {ansible_dir}", ) dir_data = json.loads(dir_check) if dir_data.get("exit_code") != 0: return { "status": "not_found", "service": service_name, "error": f"Ansible deployment not found: {ansible_dir}", } # Check playbook exists playbook_path = f"{ansible_dir}/playbooks/{service_name}.yml" playbook_check = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"test -f {playbook_path}", ) playbook_data = json.loads(playbook_check) if playbook_data.get("exit_code") != 0: return { "status": "error", "service": service_name, "error": f"Playbook not found: {playbook_path}", } # Get file information info_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=f"ls -la {ansible_dir}/playbooks/{service_name}.yml {ansible_dir}/inventory/hosts", ) info_data = json.loads(info_result) # Check if Ansible is installed ansible_check = await ssh_execute_command( hostname=hostname, username=username, password=password, command="ansible-playbook --version", ) ansible_data = json.loads(ansible_check) ansible_installed = ansible_data.get("exit_code") == 0 return { "status": "deployed", "service": service_name, "ansible_dir": ansible_dir, "playbook_path": playbook_path, "ansible_installed": ansible_installed, "ansible_version": ansible_data.get("output", "").split("\n")[0] if ansible_installed else None, "files_info": info_data.get("output", ""), } async def run_ansible_playbook( self, service_name: str, hostname: str, username: str = "mcp_admin", password: str | None = None, tags: list[str] | None = None, extra_vars: dict | None = None, check_mode: bool = False, ) -> dict[str, Any]: """Run an existing Ansible playbook for a service.""" ansible_dir = f"/opt/ansible/{service_name}" # Check if playbook exists check_result = await self.check_ansible_service( service_name, hostname, username, password ) if check_result["status"] != "deployed": return check_result # Build ansible-playbook command cmd = f"cd {ansible_dir} && ansible-playbook -i inventory/hosts playbooks/{service_name}.yml" # Add check mode if requested if check_mode: cmd += " --check" # Add tags if specified if tags: cmd += f" --tags {','.join(tags)}" # Add extra variables if specified if extra_vars: for key, value in extra_vars.items(): cmd += f" --extra-vars '{key}={value}'" # Add verbose output cmd += " -v" # Run the playbook run_result = await ssh_execute_command( hostname=hostname, username=username, password=password, command=cmd ) run_data = json.loads(run_result) return { "status": "success" if run_data.get("exit_code") == 0 else "error", "service": service_name, "action": "check" if check_mode else "run", "command": cmd, "output": run_data.get("output", ""), "exit_code": run_data.get("exit_code"), } async def _install_iso_service( self, service_name: str, service: dict, hostname: str, username: str, password: str | None, config_override: dict | None, ) -> dict[str, Any]: """Handle ISO-based installation (like TrueNAS, Proxmox, etc.).""" # ISO installations typically require manual intervention or VM/bare metal setup # This method provides guidance rather than automated installation installation_config = service.get("installation", {}) download_url = installation_config.get("download_url") installation_steps = installation_config.get("installation_steps", []) requirements = installation_config.get("requirements", []) return { "status": "guidance", "service": service_name, "method": "iso_installation", "message": "ISO-based installations require manual setup", "guidance": { "download_url": download_url, "requirements": requirements, "installation_steps": installation_steps, "installation_type": installation_config.get( "installation_type", "bare_metal_or_vm" ), "target_hostname": hostname, "next_steps": [ "Download the ISO from the provided URL", "Create bootable media or mount ISO in VM", "Boot target system from installation media", "Follow the provided installation steps", "Configure network settings to match target hostname", "Complete post-installation configuration", ], }, }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/washyu/mcp_python_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server