"""Automation and orchestration FastMCP tools for Ludus MCP server."""
from typing import Any
from fastmcp import FastMCP
from ludus_mcp.core.client import LudusAPIClient
from ludus_mcp.server.handlers.automation_orchestration import AutomationOrchestrationHandler
from ludus_mcp.server.handlers.backup import BackupHandler
from ludus_mcp.server.tools.utils import LazyHandlerRegistry, format_tool_response
def create_automation_tools(client: LudusAPIClient) -> FastMCP:
"""Create automation and orchestration tools.
Args:
client: Ludus API client
Returns:
FastMCP instance with automation tools registered
"""
mcp = FastMCP("Automation & Orchestration")
registry = LazyHandlerRegistry(client)
# ==================== AUTOMATION TOOLS ====================
@mcp.tool()
async def create_deployment_pipeline(
name: str,
stages: list[dict[str, Any]],
triggers: dict[str, Any] | None = None,
user_id: str | None = None
) -> dict:
"""Create a deployment pipeline with multiple stages.
Args:
name: Pipeline name
stages: List of pipeline stages with configurations
triggers: Optional trigger conditions (schedule, webhook, etc.)
user_id: Optional user ID (admin only)
Returns:
Pipeline creation result
"""
handler = registry.get_handler("automation", AutomationOrchestrationHandler)
result = await handler.create_deployment_pipeline(name, stages, triggers, user_id)
return format_tool_response(result)
@mcp.tool()
async def schedule_range_tasks(
tasks: list[dict[str, Any]],
schedule: str,
user_id: str | None = None
) -> dict:
"""Schedule recurring tasks for the range.
Args:
tasks: List of tasks to schedule
schedule: Cron expression for scheduling
user_id: Optional user ID (admin only)
Returns:
Task scheduling result
"""
handler = registry.get_handler("automation", AutomationOrchestrationHandler)
result = await handler.schedule_range_tasks(tasks, schedule, user_id)
return format_tool_response(result)
@mcp.tool()
async def auto_scaling(
enable: bool = True,
min_vms: int = 1,
max_vms: int = 10,
scaling_policy: dict[str, Any] | None = None,
user_id: str | None = None
) -> dict:
"""Configure auto-scaling for the range.
Args:
enable: Enable or disable auto-scaling
min_vms: Minimum number of VMs
max_vms: Maximum number of VMs
scaling_policy: Scaling policy configuration
user_id: Optional user ID (admin only)
Returns:
Auto-scaling configuration result
"""
handler = registry.get_handler("automation", AutomationOrchestrationHandler)
result = await handler.auto_scaling(enable, min_vms, max_vms, scaling_policy, user_id)
return format_tool_response(result)
# ==================== SNAPSHOT AUTOMATION TOOLS ====================
@mcp.tool()
async def schedule_snapshots(
vm_names: list[str],
schedule: str,
retention_count: int = 5,
user_id: str | None = None
) -> dict:
"""Schedule automatic snapshots for VMs.
Args:
vm_names: List of VM names to snapshot
schedule: Cron expression for snapshot schedule
retention_count: Number of snapshots to retain
user_id: Optional user ID (admin only)
Returns:
Snapshot scheduling result
"""
handler = registry.get_handler("backup", BackupHandler)
result = await handler.schedule_snapshots(vm_names, schedule, retention_count, user_id)
return format_tool_response(result)
# ==================== RANGE CLONING TOOLS ====================
@mcp.tool()
async def clone_range(
target_user_id: str,
include_snapshots: bool = False,
user_id: str | None = None
) -> dict:
"""Clone the current range to another user.
Args:
target_user_id: User ID to clone range to
include_snapshots: Whether to include snapshots in clone
user_id: Optional user ID (admin only)
Returns:
Clone operation result
"""
handler = registry.get_handler("backup", BackupHandler)
result = await handler.clone_range(target_user_id, include_snapshots, user_id)
return format_tool_response(result)
@mcp.tool()
async def export_range_backup(
include_vms: bool = True,
include_config: bool = True,
user_id: str | None = None
) -> dict:
"""Export range backup.
Args:
include_vms: Include VM disk images in backup
include_config: Include configuration in backup
user_id: Optional user ID (admin only)
Returns:
Backup export result with download link
"""
handler = registry.get_handler("backup", BackupHandler)
result = await handler.export_range_backup(include_vms, include_config, user_id)
return format_tool_response(result)
@mcp.tool()
async def import_range_backup(
backup_file: str,
restore_vms: bool = True,
restore_config: bool = True,
user_id: str | None = None
) -> dict:
"""Import and restore range from backup.
Args:
backup_file: Path to backup file
restore_vms: Restore VM disk images
restore_config: Restore configuration
user_id: Optional user ID (admin only)
Returns:
Backup import result
"""
handler = registry.get_handler("backup", BackupHandler)
result = await handler.import_range_backup(
backup_file, restore_vms, restore_config, user_id
)
return format_tool_response(result)
# ==================== BULK OPERATIONS ====================
@mcp.tool()
async def bulk_vm_operations(
operation: str,
vm_names: list[str] | None = None,
parameters: dict[str, Any] | None = None,
user_id: str | None = None
) -> dict:
"""Perform bulk operations on multiple VMs.
Args:
operation: Operation to perform (power_on, power_off, snapshot, delete)
vm_names: Optional list of VM names (defaults to all VMs)
parameters: Optional operation-specific parameters
user_id: Optional user ID (admin only)
Returns:
Bulk operation results
"""
handler = registry.get_handler("automation", AutomationOrchestrationHandler)
result = await handler.bulk_vm_operations(operation, vm_names, parameters, user_id)
return format_tool_response(result)
@mcp.tool()
async def delete_range(
confirm: bool = False,
user_id: str | None = None
) -> dict:
"""Delete the entire range.
Permanently removes the range and all associated VMs, snapshots, and data.
**Important:** If a deployment is in progress, abort it first with `abort_range_deployment()`.
Args:
confirm: Confirmation flag (must be True to proceed)
user_id: Optional user ID (admin only)
Returns:
Range deletion result
Example:
# Delete current user's range (after aborting if needed)
result = await delete_range(confirm=True)
Workflow:
1. If deployment is active: abort_range_deployment()
2. Then delete: delete_range(confirm=True)
"""
if not confirm:
return {
"error": "delete_range requires confirm=True to proceed",
"warning": "This operation will permanently delete all VMs, snapshots, and data",
"note": "If a deployment is in progress, abort it first with abort_range_deployment()",
"safety": "This will ONLY delete the range for the specified user_id. Other users' ranges are protected."
}
# SAFETY: Always require explicit user_id or use current user
# Pass require_explicit_user=False only when user_id is explicitly provided
result = await client.delete_range(
user_id=user_id,
require_explicit_user=(user_id is not None) # Only require explicit if user_id was provided
)
return format_tool_response(result)
@mcp.tool()
async def abort_and_remove_range(
confirm: bool = False,
user_id: str | None = None
) -> dict:
"""Abort any active deployment and then remove the range.
This is a convenience function that combines abort_range_deployment() and delete_range().
Equivalent to running:
1. `ludus range abort`
2. `ludus rm` (with confirmation)
Args:
confirm: Confirmation flag (must be True to proceed)
user_id: Optional user ID (admin only)
Returns:
Combined abort and deletion results
Example:
# Abort and remove current user's range
result = await abort_and_remove_range(confirm=True)
"""
if not confirm:
return {
"error": "abort_and_remove_range requires confirm=True to proceed",
"warning": "This will abort any active deployment and permanently delete the range",
"workflow": "1. Abort deployment (if active) 2. Delete range"
}
results = {
"abort_result": None,
"delete_result": None,
"status": "success"
}
# First, try to abort any active deployment
try:
abort_result = await client.abort_range_deployment(
user_id=user_id,
require_explicit_user=(user_id is not None)
)
results["abort_result"] = abort_result
results["abort_status"] = "aborted" if abort_result else "no_active_deployment"
except Exception as e:
# If abort fails, it might mean there's no active deployment
results["abort_result"] = {"note": f"No active deployment to abort or error: {str(e)}"}
results["abort_status"] = "skipped"
# Then delete the range with safety checks
try:
delete_result = await client.delete_range(
user_id=user_id,
require_explicit_user=(user_id is not None) # Only require explicit if user_id was provided
)
results["delete_result"] = delete_result
results["delete_status"] = "deleted"
except Exception as e:
results["delete_status"] = "failed"
results["error"] = str(e)
results["status"] = "partial_failure"
return format_tool_response(results)
@mcp.tool()
async def get_recovery_recommendation(user_id: str | None = None) -> dict:
"""Get recovery recommendations for failed deployments.
Args:
user_id: Optional user ID (admin only)
Returns:
Recovery recommendations based on failure analysis
"""
handler = registry.get_handler("automation", AutomationOrchestrationHandler)
result = await handler.get_recovery_recommendation(user_id)
return format_tool_response(result)
return mcp