Skip to main content
Glama
orchestration.py18.3 kB
"""Smart deployment orchestration handler.""" import asyncio from datetime import datetime, timedelta from typing import Any from ludus_mcp.core.client import LudusAPIClient from ludus_mcp.server.handlers.scenarios import ScenarioHandler from ludus_mcp.server.handlers.deployment import DeploymentHandler from ludus_mcp.server.handlers.validation import ValidationHandler from ludus_mcp.server.handlers.snapshots import SnapshotHandler from ludus_mcp.schemas.orchestration import ( SmartDeployResult, DeploymentTimeline, DeploymentStep, MonitoringUpdate, RecoveryRecommendation, ) from ludus_mcp.utils.logging import get_logger from ludus_mcp.utils.error_formatter import ErrorFormatter logger = get_logger(__name__) class DeploymentOrchestrator: """Orchestrates smart deployment workflows with validation, monitoring, and recovery.""" def __init__(self, client: LudusAPIClient): """Initialize the orchestrator.""" self.client = client self.scenario_handler = ScenarioHandler(client) self.deployment_handler = DeploymentHandler(client) self.validation_handler = ValidationHandler(client) self.snapshot_handler = SnapshotHandler(client) async def smart_deploy( self, scenario_key: str, siem_type: str = "wazuh", auto_validate: bool = True, auto_snapshot: bool = False, auto_monitor: bool = True, user_id: str | None = None, ) -> SmartDeployResult: """ Smart deployment with validation, optional snapshot, and auto-monitoring. Args: scenario_key: Scenario to deploy siem_type: SIEM type to include auto_validate: Validate before deploying auto_snapshot: Create snapshot before deployment auto_monitor: Enable auto-monitoring after deployment user_id: Optional user ID Returns: SmartDeployResult with deployment info and monitoring guidance """ logger.info( f"Smart deploy: {scenario_key} with SIEM: {siem_type}, " f"validate={auto_validate}, snapshot={auto_snapshot}, monitor={auto_monitor}" ) # Step 1: Get and preview configuration try: preview = await self.scenario_handler.preview_scenario(scenario_key, siem_type) except Exception as e: logger.error(f"Failed to get scenario preview: {e}") return SmartDeployResult( status="failed", scenario_key=scenario_key, siem_type=siem_type, vm_count=0, estimated_time="unknown", message=f"[ERROR] Failed to get scenario configuration: {e}", ) # Step 2: Validate configuration (if enabled) if auto_validate: try: config = await self.scenario_handler.get_scenario_config(scenario_key, siem_type) validation = await self.validation_handler.validate_config(config) if not validation.valid: # Format validation errors formatted = ErrorFormatter.format_validation_errors( [e.model_dump() for e in validation.errors], [w.model_dump() for w in validation.warnings] if validation.warnings else None, ) return SmartDeployResult( status="validation_failed", scenario_key=scenario_key, siem_type=siem_type, vm_count=preview.vm_count, estimated_time=preview.estimated_time, message=f"[ERROR] Validation failed:\n\n{formatted}", ) logger.info(f"Validation passed for {scenario_key}") except Exception as e: logger.error(f"Validation error: {e}") logger.warning(f"Validation failed with exception, but proceeding with deployment: {e}") # Don't return - continue to deployment step despite validation error # This allows deployment even if validation has issues (e.g., format mismatches) # Step 3: Create pre-deployment snapshot (if enabled) snapshot_id = None if auto_snapshot: try: # Get current range to snapshot range_info = await self.client.get_range(user_id) if range_info.get("numberOfVMs", 0) > 0: snapshot_name = f"pre-deploy-{scenario_key}-{datetime.now().strftime('%Y%m%d-%H%M%S')}" # Note: This would need to be implemented per VM logger.info(f"Snapshot creation requested: {snapshot_name}") # snapshot_id = await self.snapshot_handler.create_snapshot(...) except Exception as e: logger.warning(f"Snapshot creation failed: {e}") # Step 4: Deploy scenario try: # Note: resource_profile defaults to "minimal" in deploy_scenario # If you need a different profile, use deploy_scenario directly deployment_result = await self.scenario_handler.deploy_scenario( scenario_key=scenario_key, user_id=user_id, ensure_roles=True, siem_type=siem_type, resource_profile="recommended", # Use recommended profile for smart_deploy ) logger.info(f"Deployment initiated for {scenario_key}") except Exception as e: logger.error(f"Deployment failed: {e}") formatted_error = ErrorFormatter.format_error(str(e)) return SmartDeployResult( status="failed", scenario_key=scenario_key, siem_type=siem_type, vm_count=preview.vm_count, estimated_time=preview.estimated_time, snapshot_id=snapshot_id, message=f"[ERROR] Deployment failed:\n\n{formatted_error}", ) # Step 5: Return success with monitoring instructions monitoring_commands = { "status": "ludus.quick_status", "detailed_status": "ludus.get_deployment_status", "health": "ludus.check_deployment_health", "logs": "ludus.get_full_logs", "monitor": "ludus.monitor_deployment", } next_check_msg = "" if auto_monitor: next_check_msg = "\n\n[INFO] Auto-monitoring enabled - I'll check status in 30 seconds and provide updates." message = f"""[OK] Deployment Started! **Scenario:** {scenario_key} **SIEM:** {siem_type.title()} **VMs:** {preview.vm_count} **Estimated Time:** {preview.estimated_time} **Status:** DEPLOYING **Started:** {datetime.now().strftime('%H:%M:%S')} {preview.visualization[:300]}... {next_check_msg} **Monitoring Commands:** - Quick status: `ludus.quick_status` - Detailed status: `ludus.get_deployment_status` - Check health: `ludus.check_deployment_health` - Full logs: `ludus.get_full_logs` [TIP] Deployments typically take {preview.estimated_time}. AD services may need 10-15 minutes to fully initialize.""" # Extract the actual deployment result from the nested structure actual_deployment_result = deployment_result.get("deployment_result", deployment_result) deployment_id = None if isinstance(actual_deployment_result, dict): deployment_id = actual_deployment_result.get("id") return SmartDeployResult( status="started", deployment_id=deployment_id, scenario_key=scenario_key, siem_type=siem_type, vm_count=preview.vm_count, estimated_time=preview.estimated_time, snapshot_id=snapshot_id, auto_monitor=auto_monitor, check_interval=30, next_check_message=next_check_msg, monitoring_commands=monitoring_commands, message=message, ) async def monitor_deployment_once( self, user_id: str | None = None, check_number: int = 1, max_checks: int = 40, ) -> MonitoringUpdate: """ Get a single monitoring update. Args: user_id: Optional user ID check_number: Current check number max_checks: Maximum checks before stopping Returns: MonitoringUpdate with current status """ logger.debug(f"Monitoring deployment (check {check_number}/{max_checks})") try: # Get current status range_info = await self.client.get_range(user_id) range_state = range_info.get("rangeState", "UNKNOWN") vm_count = range_info.get("numberOfVMs", 0) vms = range_info.get("VMs", []) # Count ready VMs vms_ready = sum(1 for vm in vms if vm.get("status") == "running") # Calculate progress if range_state == "SUCCESS": progress = 100 elif range_state == "DEPLOYING": # Estimate based on ready VMs if vm_count > 0: progress = int((vms_ready / vm_count) * 80) # Max 80% during deployment else: progress = 10 elif range_state == "FAILED": progress = 0 else: progress = 0 # Get logs for recent activity try: logs = await self.client.get_range_logs(user_id) # Extract last few lines log_lines = logs.split('\n') if logs else [] recent_activity = [line for line in log_lines[-10:] if line.strip()][:5] except Exception: recent_activity = [] # Determine current task from logs current_task = "Initializing..." if recent_activity: last_line = recent_activity[-1] if recent_activity else "" if "TASK" in last_line: current_task = last_line.split("TASK")[1].strip()[:100] elif any(keyword in last_line.lower() for keyword in ["domain", "controller", "dc"]): current_task = "Configuring domain controller..." elif "join" in last_line.lower(): current_task = "Joining VMs to domain..." elif "wazuh" in last_line.lower() or "siem" in last_line.lower(): current_task = "Setting up SIEM monitoring..." # Check for issues health_check = await self.deployment_handler.check_deployment_health(user_id) issues = health_check.get("issues", []) is_healthy = health_check.get("health_status") == "healthy" # Calculate timing # Estimate 20 minutes for typical deployment elapsed_minutes = check_number // 2 # Assuming 30s intervals if range_state == "DEPLOYING": eta_minutes = max(0, 20 - elapsed_minutes) else: eta_minutes = 0 # Determine if should continue monitoring should_continue = ( range_state == "DEPLOYING" and check_number < max_checks ) return MonitoringUpdate( timestamp=datetime.now(), range_state=range_state, vm_count=vm_count, vms_ready=vms_ready, current_task=current_task, progress_percentage=progress, recent_activity=recent_activity, is_healthy=is_healthy, issues=issues, elapsed_minutes=elapsed_minutes, eta_minutes=eta_minutes, next_check_in=30 if should_continue else 0, should_continue_monitoring=should_continue, ) except Exception as e: logger.error(f"Monitoring error: {e}") return MonitoringUpdate( timestamp=datetime.now(), range_state="ERROR", vm_count=0, vms_ready=0, current_task=f"Error: {e}", progress_percentage=0, is_healthy=False, issues=[str(e)], elapsed_minutes=0, eta_minutes=0, should_continue_monitoring=False, ) async def get_recovery_recommendation( self, user_id: str | None = None, ) -> RecoveryRecommendation: """ Get recovery recommendations for failed deployment. Args: user_id: Optional user ID Returns: RecoveryRecommendation with action steps """ logger.info("Getting recovery recommendation") try: range_info = await self.client.get_range(user_id) range_state = range_info.get("rangeState", "UNKNOWN") logs = await self.client.get_range_logs(user_id) # Analyze failure if range_state != "FAILED": return RecoveryRecommendation( action="none", reason=f"Range is not failed (state: {range_state})", severity="info", steps=["No recovery needed - range is operational or deploying"], ) # Check for known error patterns logs_lower = logs.lower() if logs else "" # ADWS errors - transient, wait if "active directory web services" in logs_lower or "adws" in logs_lower: return RecoveryRecommendation( action="wait", reason="Active Directory Web Services not yet started (transient issue)", severity="warning", steps=[ "1. Wait 5-10 minutes for AD services to initialize", "2. Check status: ludus.quick_status", "3. Ludus will auto-retry failed tasks", "4. If still failing after 15 min, check logs: ludus.get_full_logs", ], commands={ "status": "ludus.quick_status", "health": "ludus.check_deployment_health", "logs": "ludus.get_full_logs", }, estimated_recovery_time="5-10 minutes (automatic)", ) # Template errors - config issue if "template not found" in logs_lower or "template" in logs_lower and "error" in logs_lower: return RecoveryRecommendation( action="fix_config", reason="VM template not found or invalid", severity="error", steps=[ "1. List available templates: ludus.list_templates", "2. Update configuration with correct template name", "3. Validate config: ludus.validate_config", "4. Delete failed range: ludus.delete_range", "5. Redeploy with fixed config: ludus.deploy_range", ], commands={ "list_templates": "ludus.list_templates", "validate": "ludus.validate_config(config)", "delete": "ludus.delete_range", }, estimated_recovery_time="5 minutes + redeployment time", ) # Network/connectivity issues - may resolve if any(keyword in logs_lower for keyword in ["unreachable", "connection refused", "timeout"]): return RecoveryRecommendation( action="wait", reason="Network connectivity issues detected", severity="warning", steps=[ "1. Wait 3-5 minutes for VMs to fully boot", "2. Check VM status: ludus.get_range", "3. Verify VMs are running and accessible", "4. If persists, check network configuration", "5. Consider redeploying if issue continues", ], commands={ "check_vms": "ludus.get_range", "status": "ludus.quick_status", "health": "ludus.check_deployment_health", }, estimated_recovery_time="3-5 minutes (may auto-recover)", ) # Generic failure return RecoveryRecommendation( action="destroy", reason="Deployment failed with unrecognized error", severity="error", steps=[ "1. Review full logs: ludus.get_full_logs", "2. Identify root cause from logs", "3. Fix configuration if needed", "4. Delete failed range: ludus.delete_range", "5. Redeploy: ludus.deploy_scenario or ludus.smart_deploy", ], commands={ "logs": "ludus.get_full_logs", "delete": "ludus.delete_range", "redeploy": "ludus.smart_deploy(scenario_key='...', siem_type='...')", }, estimated_recovery_time="Depends on issue - review logs first", ) except Exception as e: logger.error(f"Error getting recovery recommendation: {e}") return RecoveryRecommendation( action="error", reason=f"Failed to analyze deployment: {e}", severity="critical", steps=[ "1. Check Ludus API connectivity", "2. Verify configuration", "3. Review server logs", ], commands={}, estimated_recovery_time="Unknown", )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server