We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Handler for resource management operations."""
from datetime import datetime, timedelta
from typing import Any
from ludus_mcp.core.client import LudusAPIClient
from ludus_mcp.utils.logging import get_logger
logger = get_logger(__name__)
class ResourceManagementHandler:
"""Handler for resource management."""
def __init__(self, client: LudusAPIClient) -> None:
"""Initialize the resource management handler."""
self.client = client
async def get_resource_quotas(self, user_id: str | None = None) -> dict[str, Any]:
"""View resource limits and usage."""
try:
range_info = await self.client.get_range(user_id)
vms = range_info.get("VMs", [])
# Calculate current usage
current_usage = {
"vms": len(vms),
"memory_mb": sum(vm.get("memory", 0) for vm in vms),
"cpus": sum(vm.get("cpus", 0) for vm in vms),
"disk_gb": sum(vm.get("disk", 0) for vm in vms)
}
# Define quotas (example values - should be configurable)
quotas = {
"max_vms": 50,
"max_memory_gb": 256,
"max_cpus": 64,
"max_disk_gb": 2000
}
# Calculate usage percentages
usage_percentages = {
"vms": (current_usage["vms"] / quotas["max_vms"] * 100),
"memory": (current_usage["memory_mb"] / 1024 / quotas["max_memory_gb"] * 100),
"cpus": (current_usage["cpus"] / quotas["max_cpus"] * 100),
"disk": (current_usage["disk_gb"] / quotas["max_disk_gb"] * 100)
}
# Check for quota warnings
warnings = []
for resource, percentage in usage_percentages.items():
if percentage >= 90:
warnings.append(f"{resource.upper()} usage at {percentage:.1f}% - approaching limit")
elif percentage >= 75:
warnings.append(f"{resource.upper()} usage at {percentage:.1f}% - monitor closely")
return {
"status": "success",
"timestamp": datetime.now().isoformat(),
"current_usage": current_usage,
"quotas": quotas,
"usage_percentages": {k: round(v, 2) for k, v in usage_percentages.items()},
"available_resources": {
"vms": quotas["max_vms"] - current_usage["vms"],
"memory_gb": quotas["max_memory_gb"] - (current_usage["memory_mb"] / 1024),
"cpus": quotas["max_cpus"] - current_usage["cpus"],
"disk_gb": quotas["max_disk_gb"] - current_usage["disk_gb"]
},
"warnings": warnings
}
except Exception as e:
logger.error(f"Error getting resource quotas: {e}")
return {"status": "error", "error": str(e)}
async def optimize_resource_allocation(self, user_id: str | None = None) -> dict[str, Any]:
"""Suggest resource optimization."""
try:
range_info = await self.client.get_range(user_id)
vms = range_info.get("VMs", [])
optimizations = []
total_savings = {"memory_mb": 0, "cpus": 0, "disk_gb": 0}
for vm in vms:
vm_name = vm.get("name", "unknown")
memory = vm.get("memory", 0)
cpus = vm.get("cpus", 0)
disk = vm.get("disk", 0)
# Memory optimization
if memory > 8192:
savings = memory - 8192
optimizations.append({
"vm": vm_name,
"resource": "memory",
"current": memory,
"suggested": 8192,
"savings_mb": savings,
"reason": "High memory allocation, consider reducing"
})
total_savings["memory_mb"] += savings
# CPU optimization
if cpus > 4:
savings = cpus - 4
optimizations.append({
"vm": vm_name,
"resource": "cpu",
"current": cpus,
"suggested": 4,
"savings_cpus": savings,
"reason": "High CPU allocation, most workloads use 2-4 cores"
})
total_savings["cpus"] += savings
# Disk optimization
if disk > 100:
savings = disk - 80
optimizations.append({
"vm": vm_name,
"resource": "disk",
"current": disk,
"suggested": 80,
"savings_gb": savings,
"reason": "Large disk allocation, review actual usage"
})
total_savings["disk_gb"] += savings
return {
"status": "success",
"timestamp": datetime.now().isoformat(),
"optimizations": optimizations,
"total_savings": total_savings,
"potential_cost_savings": {
"monthly_usd": round(
(total_savings["memory_mb"] / 1024 * 0.01 * 24 * 30) +
(total_savings["cpus"] * 0.05 * 24 * 30) +
(total_savings["disk_gb"] * 0.10),
2
)
},
"summary": f"{len(optimizations)} optimization opportunities found"
}
except Exception as e:
logger.error(f"Error optimizing resource allocation: {e}")
return {"status": "error", "error": str(e)}
async def schedule_maintenance_window(
self,
start_time: str,
duration_hours: int,
description: str,
user_id: str | None = None
) -> dict[str, Any]:
"""Plan maintenance with minimal disruption."""
try:
range_info = await self.client.get_range(user_id)
# Parse start time
try:
start_dt = datetime.fromisoformat(start_time)
except Exception:
return {
"status": "error",
"error": "Invalid start_time format. Use ISO format: YYYY-MM-DDTHH:MM:SS"
}
end_dt = start_dt + timedelta(hours=duration_hours)
maintenance_window = {
"start_time": start_dt.isoformat(),
"end_time": end_dt.isoformat(),
"duration_hours": duration_hours,
"description": description,
"range_state": range_info.get("rangeState"),
"pre_maintenance_tasks": [
{"task": "Create snapshots of all VMs", "tool": "ludus.snapshot_host"},
{"task": "Notify users of downtime", "tool": "communication"},
{"task": "Document current configuration", "tool": "ludus.export_range_backup"}
],
"maintenance_tasks": [
{"task": description, "estimated_time": f"{duration_hours}h"}
],
"post_maintenance_tasks": [
{"task": "Verify all VMs are running", "tool": "ludus.get_range"},
{"task": "Test connectivity", "tool": "ludus.test_network_connectivity"},
{"task": "Notify users of completion", "tool": "communication"}
],
"rollback_plan": {
"trigger": "If maintenance fails",
"action": "Use ludus.rollback_snapshot to restore pre-maintenance state"
}
}
return {
"status": "success",
"timestamp": datetime.now().isoformat(),
"maintenance_window": maintenance_window,
"affected_vms": range_info.get("numberOfVMs", 0)
}
except Exception as e:
logger.error(f"Error scheduling maintenance window: {e}")
return {"status": "error", "error": str(e)}
async def bulk_vm_operations(
self,
operation: str,
vm_filter: dict | None = None,
user_id: str | None = None
) -> dict[str, Any]:
"""Bulk operations on multiple VMs."""
try:
valid_operations = ["power_on", "power_off", "snapshot", "delete", "restart"]
if operation not in valid_operations:
return {
"status": "error",
"error": f"Invalid operation. Must be one of: {valid_operations}"
}
range_info = await self.client.get_range(user_id)
vms = range_info.get("VMs", [])
# Apply filters if specified
if vm_filter:
if "status" in vm_filter:
vms = [vm for vm in vms if vm.get("status") == vm_filter["status"]]
if "template" in vm_filter:
vms = [vm for vm in vms if vm_filter["template"] in vm.get("template", "")]
if "name_pattern" in vm_filter:
pattern = vm_filter["name_pattern"].lower()
vms = [vm for vm in vms if pattern in vm.get("name", "").lower()]
# Generate operation plan
operations_plan = []
for vm in vms:
operations_plan.append({
"vm_name": vm.get("name"),
"current_status": vm.get("status"),
"operation": operation,
"estimated_time_seconds": self._estimate_operation_time(operation)
})
total_time = sum(op["estimated_time_seconds"] for op in operations_plan)
return {
"status": "success",
"timestamp": datetime.now().isoformat(),
"operation": operation,
"filter_applied": vm_filter or "none",
"affected_vms": len(operations_plan),
"operations_plan": operations_plan,
"estimated_total_time": {
"seconds": total_time,
"minutes": round(total_time / 60, 2)
},
"execution_guide": {
"sequential": "Execute operations one by one",
"parallel": "Use Ansible with async for parallel execution",
"tool_mapping": {
"power_on": "ludus.power_on_range",
"power_off": "ludus.power_off_range",
"snapshot": "ludus.snapshot_host (loop through VMs)"
}
}
}
except Exception as e:
logger.error(f"Error planning bulk VM operations: {e}")
return {"status": "error", "error": str(e)}
def _estimate_operation_time(self, operation: str) -> int:
"""Estimate operation time in seconds."""
estimates = {
"power_on": 60,
"power_off": 30,
"snapshot": 120,
"delete": 45,
"restart": 90
}
return estimates.get(operation, 60)