We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Smart deployment orchestration handler."""
import asyncio
from datetime import datetime, timedelta
from typing import Any
from ludus_mcp.core.client import LudusAPIClient
from ludus_mcp.server.handlers.scenarios import ScenarioHandler
from ludus_mcp.server.handlers.deployment import DeploymentHandler
from ludus_mcp.server.handlers.validation import ValidationHandler
from ludus_mcp.server.handlers.snapshots import SnapshotHandler
from ludus_mcp.schemas.orchestration import (
SmartDeployResult,
DeploymentTimeline,
DeploymentStep,
MonitoringUpdate,
RecoveryRecommendation,
)
from ludus_mcp.utils.logging import get_logger
from ludus_mcp.utils.error_formatter import ErrorFormatter
logger = get_logger(__name__)
class DeploymentOrchestrator:
"""Orchestrates smart deployment workflows with validation, monitoring, and recovery."""
def __init__(self, client: LudusAPIClient):
"""Initialize the orchestrator."""
self.client = client
self.scenario_handler = ScenarioHandler(client)
self.deployment_handler = DeploymentHandler(client)
self.validation_handler = ValidationHandler(client)
self.snapshot_handler = SnapshotHandler(client)
async def smart_deploy(
self,
scenario_key: str,
siem_type: str = "wazuh",
auto_validate: bool = True,
auto_snapshot: bool = False,
auto_monitor: bool = True,
user_id: str | None = None,
) -> SmartDeployResult:
"""
Smart deployment with validation, optional snapshot, and auto-monitoring.
Args:
scenario_key: Scenario to deploy
siem_type: SIEM type to include
auto_validate: Validate before deploying
auto_snapshot: Create snapshot before deployment
auto_monitor: Enable auto-monitoring after deployment
user_id: Optional user ID
Returns:
SmartDeployResult with deployment info and monitoring guidance
"""
logger.info(
f"Smart deploy: {scenario_key} with SIEM: {siem_type}, "
f"validate={auto_validate}, snapshot={auto_snapshot}, monitor={auto_monitor}"
)
# Step 1: Get and preview configuration
try:
preview = await self.scenario_handler.preview_scenario(scenario_key, siem_type)
except Exception as e:
logger.error(f"Failed to get scenario preview: {e}")
return SmartDeployResult(
status="failed",
scenario_key=scenario_key,
siem_type=siem_type,
vm_count=0,
estimated_time="unknown",
message=f"[ERROR] Failed to get scenario configuration: {e}",
)
# Step 2: Validate configuration (if enabled)
if auto_validate:
try:
config = await self.scenario_handler.get_scenario_config(scenario_key, siem_type)
validation = await self.validation_handler.validate_config(config)
if not validation.valid:
# Format validation errors
formatted = ErrorFormatter.format_validation_errors(
[e.model_dump() for e in validation.errors],
[w.model_dump() for w in validation.warnings] if validation.warnings else None,
)
return SmartDeployResult(
status="validation_failed",
scenario_key=scenario_key,
siem_type=siem_type,
vm_count=preview.vm_count,
estimated_time=preview.estimated_time,
message=f"[ERROR] Validation failed:\n\n{formatted}",
)
logger.info(f"Validation passed for {scenario_key}")
except Exception as e:
logger.error(f"Validation error: {e}")
logger.warning(f"Validation failed with exception, but proceeding with deployment: {e}")
# Don't return - continue to deployment step despite validation error
# This allows deployment even if validation has issues (e.g., format mismatches)
# Step 3: Create pre-deployment snapshot (if enabled)
snapshot_id = None
if auto_snapshot:
try:
# Get current range to snapshot
range_info = await self.client.get_range(user_id)
if range_info.get("numberOfVMs", 0) > 0:
snapshot_name = f"pre-deploy-{scenario_key}-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
# Note: This would need to be implemented per VM
logger.info(f"Snapshot creation requested: {snapshot_name}")
# snapshot_id = await self.snapshot_handler.create_snapshot(...)
except Exception as e:
logger.warning(f"Snapshot creation failed: {e}")
# Step 4: Deploy scenario
try:
# Note: resource_profile defaults to "minimal" in deploy_scenario
# If you need a different profile, use deploy_scenario directly
deployment_result = await self.scenario_handler.deploy_scenario(
scenario_key=scenario_key,
user_id=user_id,
ensure_roles=True,
siem_type=siem_type,
resource_profile="recommended", # Use recommended profile for smart_deploy
)
logger.info(f"Deployment initiated for {scenario_key}")
except Exception as e:
logger.error(f"Deployment failed: {e}")
formatted_error = ErrorFormatter.format_error(str(e))
return SmartDeployResult(
status="failed",
scenario_key=scenario_key,
siem_type=siem_type,
vm_count=preview.vm_count,
estimated_time=preview.estimated_time,
snapshot_id=snapshot_id,
message=f"[ERROR] Deployment failed:\n\n{formatted_error}",
)
# Step 5: Return success with monitoring instructions
monitoring_commands = {
"status": "ludus.quick_status",
"detailed_status": "ludus.get_deployment_status",
"health": "ludus.check_deployment_health",
"logs": "ludus.get_full_logs",
"monitor": "ludus.monitor_deployment",
}
next_check_msg = ""
if auto_monitor:
next_check_msg = "\n\n[INFO] Auto-monitoring enabled - I'll check status in 30 seconds and provide updates."
message = f"""[OK] Deployment Started!
**Scenario:** {scenario_key}
**SIEM:** {siem_type.title()}
**VMs:** {preview.vm_count}
**Estimated Time:** {preview.estimated_time}
**Status:** DEPLOYING
**Started:** {datetime.now().strftime('%H:%M:%S')}
{preview.visualization[:300]}...
{next_check_msg}
**Monitoring Commands:**
- Quick status: `ludus.quick_status`
- Detailed status: `ludus.get_deployment_status`
- Check health: `ludus.check_deployment_health`
- Full logs: `ludus.get_full_logs`
[TIP] Deployments typically take {preview.estimated_time}.
AD services may need 10-15 minutes to fully initialize."""
# Extract the actual deployment result from the nested structure
actual_deployment_result = deployment_result.get("deployment_result", deployment_result)
deployment_id = None
if isinstance(actual_deployment_result, dict):
deployment_id = actual_deployment_result.get("id")
return SmartDeployResult(
status="started",
deployment_id=deployment_id,
scenario_key=scenario_key,
siem_type=siem_type,
vm_count=preview.vm_count,
estimated_time=preview.estimated_time,
snapshot_id=snapshot_id,
auto_monitor=auto_monitor,
check_interval=30,
next_check_message=next_check_msg,
monitoring_commands=monitoring_commands,
message=message,
)
async def monitor_deployment_once(
self,
user_id: str | None = None,
check_number: int = 1,
max_checks: int = 40,
) -> MonitoringUpdate:
"""
Get a single monitoring update.
Args:
user_id: Optional user ID
check_number: Current check number
max_checks: Maximum checks before stopping
Returns:
MonitoringUpdate with current status
"""
logger.debug(f"Monitoring deployment (check {check_number}/{max_checks})")
try:
# Get current status
range_info = await self.client.get_range(user_id)
range_state = range_info.get("rangeState", "UNKNOWN")
vm_count = range_info.get("numberOfVMs", 0)
vms = range_info.get("VMs", [])
# Count ready VMs
vms_ready = sum(1 for vm in vms if vm.get("status") == "running")
# Calculate progress
if range_state == "SUCCESS":
progress = 100
elif range_state == "DEPLOYING":
# Estimate based on ready VMs
if vm_count > 0:
progress = int((vms_ready / vm_count) * 80) # Max 80% during deployment
else:
progress = 10
elif range_state == "FAILED":
progress = 0
else:
progress = 0
# Get logs for recent activity
try:
logs = await self.client.get_range_logs(user_id)
# Extract last few lines
log_lines = logs.split('\n') if logs else []
recent_activity = [line for line in log_lines[-10:] if line.strip()][:5]
except Exception:
recent_activity = []
# Determine current task from logs
current_task = "Initializing..."
if recent_activity:
last_line = recent_activity[-1] if recent_activity else ""
if "TASK" in last_line:
current_task = last_line.split("TASK")[1].strip()[:100]
elif any(keyword in last_line.lower() for keyword in ["domain", "controller", "dc"]):
current_task = "Configuring domain controller..."
elif "join" in last_line.lower():
current_task = "Joining VMs to domain..."
elif "wazuh" in last_line.lower() or "siem" in last_line.lower():
current_task = "Setting up SIEM monitoring..."
# Check for issues
health_check = await self.deployment_handler.check_deployment_health(user_id)
issues = health_check.get("issues", [])
is_healthy = health_check.get("health_status") == "healthy"
# Calculate timing
# Estimate 20 minutes for typical deployment
elapsed_minutes = check_number // 2 # Assuming 30s intervals
if range_state == "DEPLOYING":
eta_minutes = max(0, 20 - elapsed_minutes)
else:
eta_minutes = 0
# Determine if should continue monitoring
should_continue = (
range_state == "DEPLOYING"
and check_number < max_checks
)
return MonitoringUpdate(
timestamp=datetime.now(),
range_state=range_state,
vm_count=vm_count,
vms_ready=vms_ready,
current_task=current_task,
progress_percentage=progress,
recent_activity=recent_activity,
is_healthy=is_healthy,
issues=issues,
elapsed_minutes=elapsed_minutes,
eta_minutes=eta_minutes,
next_check_in=30 if should_continue else 0,
should_continue_monitoring=should_continue,
)
except Exception as e:
logger.error(f"Monitoring error: {e}")
return MonitoringUpdate(
timestamp=datetime.now(),
range_state="ERROR",
vm_count=0,
vms_ready=0,
current_task=f"Error: {e}",
progress_percentage=0,
is_healthy=False,
issues=[str(e)],
elapsed_minutes=0,
eta_minutes=0,
should_continue_monitoring=False,
)
async def get_recovery_recommendation(
self,
user_id: str | None = None,
) -> RecoveryRecommendation:
"""
Get recovery recommendations for failed deployment.
Args:
user_id: Optional user ID
Returns:
RecoveryRecommendation with action steps
"""
logger.info("Getting recovery recommendation")
try:
range_info = await self.client.get_range(user_id)
range_state = range_info.get("rangeState", "UNKNOWN")
logs = await self.client.get_range_logs(user_id)
# Analyze failure
if range_state != "FAILED":
return RecoveryRecommendation(
action="none",
reason=f"Range is not failed (state: {range_state})",
severity="info",
steps=["No recovery needed - range is operational or deploying"],
)
# Check for known error patterns
logs_lower = logs.lower() if logs else ""
# ADWS errors - transient, wait
if "active directory web services" in logs_lower or "adws" in logs_lower:
return RecoveryRecommendation(
action="wait",
reason="Active Directory Web Services not yet started (transient issue)",
severity="warning",
steps=[
"1. Wait 5-10 minutes for AD services to initialize",
"2. Check status: ludus.quick_status",
"3. Ludus will auto-retry failed tasks",
"4. If still failing after 15 min, check logs: ludus.get_full_logs",
],
commands={
"status": "ludus.quick_status",
"health": "ludus.check_deployment_health",
"logs": "ludus.get_full_logs",
},
estimated_recovery_time="5-10 minutes (automatic)",
)
# Template errors - config issue
if "template not found" in logs_lower or "template" in logs_lower and "error" in logs_lower:
return RecoveryRecommendation(
action="fix_config",
reason="VM template not found or invalid",
severity="error",
steps=[
"1. List available templates: ludus.list_templates",
"2. Update configuration with correct template name",
"3. Validate config: ludus.validate_config",
"4. Delete failed range: ludus.delete_range",
"5. Redeploy with fixed config: ludus.deploy_range",
],
commands={
"list_templates": "ludus.list_templates",
"validate": "ludus.validate_config(config)",
"delete": "ludus.delete_range",
},
estimated_recovery_time="5 minutes + redeployment time",
)
# Network/connectivity issues - may resolve
if any(keyword in logs_lower for keyword in ["unreachable", "connection refused", "timeout"]):
return RecoveryRecommendation(
action="wait",
reason="Network connectivity issues detected",
severity="warning",
steps=[
"1. Wait 3-5 minutes for VMs to fully boot",
"2. Check VM status: ludus.get_range",
"3. Verify VMs are running and accessible",
"4. If persists, check network configuration",
"5. Consider redeploying if issue continues",
],
commands={
"check_vms": "ludus.get_range",
"status": "ludus.quick_status",
"health": "ludus.check_deployment_health",
},
estimated_recovery_time="3-5 minutes (may auto-recover)",
)
# Generic failure
return RecoveryRecommendation(
action="destroy",
reason="Deployment failed with unrecognized error",
severity="error",
steps=[
"1. Review full logs: ludus.get_full_logs",
"2. Identify root cause from logs",
"3. Fix configuration if needed",
"4. Delete failed range: ludus.delete_range",
"5. Redeploy: ludus.deploy_scenario or ludus.smart_deploy",
],
commands={
"logs": "ludus.get_full_logs",
"delete": "ludus.delete_range",
"redeploy": "ludus.smart_deploy(scenario_key='...', siem_type='...')",
},
estimated_recovery_time="Depends on issue - review logs first",
)
except Exception as e:
logger.error(f"Error getting recovery recommendation: {e}")
return RecoveryRecommendation(
action="error",
reason=f"Failed to analyze deployment: {e}",
severity="critical",
steps=[
"1. Check Ludus API connectivity",
"2. Verify configuration",
"3. Review server logs",
],
commands={},
estimated_recovery_time="Unknown",
)