Skip to main content
Glama
deployment.py22.3 kB
"""Handler for deployment status, logs, and recovery operations.""" from datetime import datetime, timedelta from typing import Any from ludus_mcp.core.client import LudusAPIClient from ludus_mcp.schemas.orchestration import DeploymentTimeline, DeploymentStep from ludus_mcp.utils.logging import get_logger from ludus_mcp.utils.visualization import format_deployment_status logger = get_logger(__name__) class DeploymentHandler: """Handler for deployment monitoring and recovery.""" def __init__(self, client: LudusAPIClient) -> None: """Initialize the deployment handler.""" self.client = client async def quick_status(self, user_id: str | None = None) -> str: """ Get one-line deployment status with emoji indicators. Args: user_id: Optional user ID Returns: Formatted status string """ try: range_info = await self.client.get_range(user_id) state = range_info.get("rangeState", "UNKNOWN") vm_count = range_info.get("numberOfVMs", 0) # Status indicator status_indicator = { "SUCCESS": "[OK]", "DEPLOYING": "[...]", "FAILED": "[ERROR]", "DELETED": "[DELETED]", "UNKNOWN": "[?]" } indicator = status_indicator.get(state, "[?]") # Build status line status_line = f"{indicator} Range Status: {state}" if vm_count > 0: status_line += f" | VMs: {vm_count}" # If deploying, try to estimate progress if state == "DEPLOYING": # Count ready VMs vms = range_info.get("VMs", []) ready_count = sum(1 for vm in vms if vm.get("status") == "running") if ready_count > 0: status_line += f" ({ready_count}/{vm_count} ready)" # Add deployment time if available last_deployment = range_info.get("lastDeployment") if last_deployment: status_line += f" | Deployed: {last_deployment}" logger.debug(f"Quick status: {status_line}") return status_line except Exception as e: logger.error(f"Error getting quick status: {e}") return f"[ERROR] Error getting status: {str(e)}" async def get_deployment_status(self, user_id: str | None = None) -> dict[str, Any]: """Get current deployment status with detailed information.""" try: range_info = await self.client.get_range(user_id) range_config = await self.client.get_range_config(user_id) logs = await self.client.get_range_logs(user_id) return { "status": "success", "range": range_info, "config": range_config, "range_state": range_info.get("rangeState", "UNKNOWN"), "number_of_vms": range_info.get("numberOfVMs", 0), "vms": range_info.get("VMs", []), "last_deployment": range_info.get("lastDeployment"), "testing_enabled": range_info.get("testingEnabled", False), "logs_preview": logs[:2000] if logs else "No logs available", # First 2000 chars "has_logs": bool(logs), } except Exception as e: logger.error(f"Error getting deployment status: {e}") return { "status": "error", "error": str(e), } async def get_full_logs(self, user_id: str | None = None) -> dict[str, Any]: """Get full deployment logs.""" try: logs = await self.client.get_range_logs(user_id) range_info = await self.client.get_range(user_id) return { "status": "success", "range_state": range_info.get("rangeState", "UNKNOWN"), "logs": logs, "log_length": len(logs) if logs else 0, } except Exception as e: logger.error(f"Error getting logs: {e}") return { "status": "error", "error": str(e), } async def check_deployment_health(self, user_id: str | None = None) -> dict[str, Any]: """Check deployment health and identify issues.""" try: range_info = await self.client.get_range(user_id) logs = await self.client.get_range_logs(user_id) range_state = range_info.get("rangeState", "UNKNOWN") issues = [] warnings = [] recommendations = [] # Check range state if range_state == "FAILED": issues.append("Range deployment has failed") recommendations.append("Check logs for specific errors") recommendations.append("Consider aborting and redeploying") elif range_state == "DEPLOYING": # Check for common issues in logs if logs: log_lower = logs.lower() # Check for AD Web Services issues if "active directory web services" in log_lower or "adws" in log_lower or "unable to find a default server with active directory web services" in log_lower: issues.append("Active Directory Web Services (ADWS) not running") recommendations.append("This is normal during initial DC setup - ADWS starts after domain promotion") recommendations.append("Wait 5-10 minutes for AD services to fully initialize") recommendations.append("Ludus will automatically retry - no action needed") recommendations.append("Check if DC VM is fully booted: ludus.get_range()") warnings.append("ADWS startup can take several minutes - deployment may still succeed") # Check for Ansible failures if "failed:" in log_lower: # Check if it's a recoverable failure (like ADWS) if "active directory web services" not in log_lower: issues.append("Ansible playbook failures detected") recommendations.append("Review Ansible errors in logs") recommendations.append("Check VM connectivity and services") if "unreachable" in log_lower: recommendations.append("VMs may not be fully booted - wait and retry") recommendations.append("Check VM power state: ludus.get_range()") # Check for timeout issues if "timeout" in log_lower or "timed out" in log_lower: warnings.append("Timeouts detected in deployment") recommendations.append("Deployment may be slow but still progressing") # Check for network issues if "connection refused" in log_lower or "no route to host" in log_lower: issues.append("Network connectivity issues detected") recommendations.append("Check network configuration and VM networking") health_status = "healthy" if issues: health_status = "unhealthy" elif warnings: health_status = "degraded" return { "status": "success", "health_status": health_status, "range_state": range_state, "issues": issues, "warnings": warnings, "recommendations": recommendations, "number_of_vms": range_info.get("numberOfVMs", 0), "expected_vms": len(range_info.get("VMs", [])), } except Exception as e: logger.error(f"Error checking deployment health: {e}") return { "status": "error", "error": str(e), } async def resume_deployment(self, user_id: str | None = None) -> dict[str, Any]: """Attempt to resume a failed or stuck deployment. Note: Ludus handles deployment resumption automatically. This function checks if deployment can be resumed and provides guidance. """ try: range_info = await self.client.get_range(user_id) range_state = range_info.get("rangeState", "UNKNOWN") if range_state == "SUCCESS": return { "status": "info", "message": "Range deployment is already successful", "range_state": range_state, } if range_state == "DEPLOYING": return { "status": "info", "message": "Deployment is already in progress. Ludus will automatically retry failed tasks.", "range_state": range_state, "note": "Ludus automatically resumes deployment. No action needed.", } if range_state == "FAILED": # Get current config and logs to analyze try: config = await self.client.get_range_config(user_id) logs = await self.client.get_range_logs(user_id) # Check if failure is recoverable recoverable = False recovery_note = "" if logs: log_lower = logs.lower() # ADWS failures are often recoverable - just need to wait if "active directory web services" in log_lower: recoverable = True recovery_note = "AD Web Services issue - can be resolved by waiting or retrying" options = [ "1. Review logs: ludus.get_full_logs", "2. Check deployment health: ludus.check_deployment_health", ] if recoverable: options.append("3. Wait for services to start, then retry deployment") options.append("4. Or redeploy with current config: ludus.deploy_range") else: options.append("3. Fix configuration if needed") options.append("4. Redeploy with updated config: ludus.deploy_range") options.append("5. Or delete and recreate the range: ludus.delete_range") return { "status": "action_required", "message": "Deployment has failed. Options:", "range_state": range_state, "recoverable": recoverable, "recovery_note": recovery_note, "options": options, "current_config_available": bool(config), "note": "Ludus will decide if range should be destroyed. Check should_destroy_range for recommendations.", } except Exception as e: return { "status": "error", "message": "Failed to get range config for recovery", "error": str(e), } return { "status": "unknown", "message": f"Unknown range state: {range_state}", "range_state": range_state, } except Exception as e: logger.error(f"Error resuming deployment: {e}") return { "status": "error", "error": str(e), } async def should_destroy_range(self, user_id: str | None = None) -> dict[str, Any]: """Analyze if range should be destroyed based on deployment state. This provides recommendations - Ludus will handle actual destruction decisions. """ try: range_info = await self.client.get_range(user_id) logs = await self.client.get_range_logs(user_id) range_state = range_info.get("rangeState", "UNKNOWN") recommendations = [] should_destroy = False reason = "" if range_state == "FAILED": # Check if failure is recoverable if logs: log_lower = logs.lower() # Check for critical failures if "fatal" in log_lower or "critical error" in log_lower: should_destroy = True reason = "Critical fatal errors detected in deployment" recommendations.append("Destroy and redeploy with corrected configuration") elif "unrecoverable" in log_lower: should_destroy = True reason = "Unrecoverable errors detected" recommendations.append("Destroy and start fresh") else: # Check if it's a transient issue if "timeout" in log_lower or "connection" in log_lower: should_destroy = False reason = "Transient errors detected - may be recoverable" recommendations.append("Wait and monitor - Ludus may auto-retry") recommendations.append("Check if VMs are accessible") else: should_destroy = True reason = "Deployment failed with errors" recommendations.append("Review logs to identify root cause") recommendations.append("Fix configuration and redeploy") else: should_destroy = True reason = "Deployment failed - no logs available for analysis" recommendations.append("Destroy and redeploy") elif range_state == "DEPLOYING": should_destroy = False reason = "Deployment in progress - do not destroy" recommendations.append("Monitor deployment status") recommendations.append("Check logs for progress") elif range_state == "SUCCESS": should_destroy = False reason = "Deployment successful - range is operational" else: should_destroy = False reason = f"Unknown state: {range_state}" recommendations.append("Check range status manually") return { "status": "success", "should_destroy": should_destroy, "reason": reason, "range_state": range_state, "recommendations": recommendations, "note": "Ludus will make the final decision on range lifecycle. This is advisory only.", } except Exception as e: logger.error(f"Error analyzing range destruction: {e}") return { "status": "error", "error": str(e), } async def get_deployment_timeline(self, user_id: str | None = None) -> DeploymentTimeline: """ Get deployment timeline with progress tracking. Args: user_id: Optional user ID Returns: DeploymentTimeline with progress information """ logger.debug("Getting deployment timeline") try: range_info = await self.client.get_range(user_id) range_state = range_info.get("rangeState", "UNKNOWN") vms = range_info.get("VMs", []) vm_count = len(vms) # Get logs to parse timeline logs = await self.client.get_range_logs(user_id) # Build timeline steps from logs and current state steps = [] started_at = datetime.now() - timedelta(minutes=10) # Estimate # Parse logs for timeline events if logs: # Step 1: VMs created if "creating" in logs.lower() or vm_count > 0: steps.append(DeploymentStep( name="VMs Created", status="completed", started_at=started_at, completed_at=started_at + timedelta(minutes=2), message=f"{vm_count} VMs created" )) # Step 2: Networks configured if "network" in logs.lower(): steps.append(DeploymentStep( name="Networks Configured", status="completed" if range_state != "DEPLOYING" else "completed", started_at=started_at + timedelta(minutes=2), completed_at=started_at + timedelta(minutes=4), message="Network topology configured" )) # Step 3: VM provisioning vms_ready = sum(1 for vm in vms if vm.get("status") == "running") if vms_ready > 0: status = "completed" if vms_ready == vm_count else "in_progress" steps.append(DeploymentStep( name="VM Provisioning", status=status, started_at=started_at + timedelta(minutes=4), completed_at=started_at + timedelta(minutes=10) if status == "completed" else None, message=f"{vms_ready}/{vm_count} VMs provisioned" )) # Step 4: Domain controller setup (if AD lab) if "domain" in logs.lower() or "dc" in logs.lower(): dc_complete = "successfully" in logs.lower() and "domain" in logs.lower() steps.append(DeploymentStep( name="Domain Controller Setup", status="completed" if dc_complete else "in_progress", started_at=started_at + timedelta(minutes=6), completed_at=started_at + timedelta(minutes=12) if dc_complete else None, message="Configuring Active Directory" )) # Step 5: Domain join (if applicable) if "join" in logs.lower(): join_complete = range_state == "SUCCESS" steps.append(DeploymentStep( name="Domain Join", status="completed" if join_complete else "in_progress", started_at=started_at + timedelta(minutes=12), completed_at=started_at + timedelta(minutes=15) if join_complete else None, message="Joining VMs to domain" )) # Step 6: SIEM setup (if applicable) if "wazuh" in logs.lower() or "siem" in logs.lower(): siem_complete = range_state == "SUCCESS" steps.append(DeploymentStep( name="SIEM Setup", status="completed" if siem_complete else "in_progress", started_at=started_at + timedelta(minutes=14), completed_at=started_at + timedelta(minutes=18) if siem_complete else None, message="Configuring SIEM monitoring" )) # Calculate progress if range_state == "SUCCESS": progress = 100 current_step = "Deployment Complete" elif range_state == "FAILED": progress = 0 current_step = "Deployment Failed" elif steps: completed_steps = sum(1 for s in steps if s.status == "completed") progress = int((completed_steps / len(steps)) * 100) if steps else 0 in_progress_steps = [s for s in steps if s.status == "in_progress"] current_step = in_progress_steps[0].name if in_progress_steps else "Initializing..." else: progress = 5 current_step = "Initializing..." # Calculate timing elapsed_minutes = 10 # Estimate if range_state == "DEPLOYING": # Estimate 20 minutes total for typical deployment remaining_minutes = max(0, 20 - elapsed_minutes) estimated_completion = datetime.now() + timedelta(minutes=remaining_minutes) else: remaining_minutes = 0 estimated_completion = None return DeploymentTimeline( started_at=started_at, steps=steps, current_step=current_step, progress_percentage=progress, estimated_completion=estimated_completion, elapsed_minutes=elapsed_minutes, remaining_minutes=remaining_minutes, ) except Exception as e: logger.error(f"Error getting deployment timeline: {e}") # Return minimal timeline on error return DeploymentTimeline( started_at=datetime.now(), steps=[], current_step=f"Error: {e}", progress_percentage=0, estimated_completion=None, elapsed_minutes=0, remaining_minutes=0, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server