Skip to main content
Glama
resource_management.py11.5 kB
"""Handler for resource management operations.""" from datetime import datetime, timedelta from typing import Any from ludus_mcp.core.client import LudusAPIClient from ludus_mcp.utils.logging import get_logger logger = get_logger(__name__) class ResourceManagementHandler: """Handler for resource management.""" def __init__(self, client: LudusAPIClient) -> None: """Initialize the resource management handler.""" self.client = client async def get_resource_quotas(self, user_id: str | None = None) -> dict[str, Any]: """View resource limits and usage.""" try: range_info = await self.client.get_range(user_id) vms = range_info.get("VMs", []) # Calculate current usage current_usage = { "vms": len(vms), "memory_mb": sum(vm.get("memory", 0) for vm in vms), "cpus": sum(vm.get("cpus", 0) for vm in vms), "disk_gb": sum(vm.get("disk", 0) for vm in vms) } # Define quotas (example values - should be configurable) quotas = { "max_vms": 50, "max_memory_gb": 256, "max_cpus": 64, "max_disk_gb": 2000 } # Calculate usage percentages usage_percentages = { "vms": (current_usage["vms"] / quotas["max_vms"] * 100), "memory": (current_usage["memory_mb"] / 1024 / quotas["max_memory_gb"] * 100), "cpus": (current_usage["cpus"] / quotas["max_cpus"] * 100), "disk": (current_usage["disk_gb"] / quotas["max_disk_gb"] * 100) } # Check for quota warnings warnings = [] for resource, percentage in usage_percentages.items(): if percentage >= 90: warnings.append(f"{resource.upper()} usage at {percentage:.1f}% - approaching limit") elif percentage >= 75: warnings.append(f"{resource.upper()} usage at {percentage:.1f}% - monitor closely") return { "status": "success", "timestamp": datetime.now().isoformat(), "current_usage": current_usage, "quotas": quotas, "usage_percentages": {k: round(v, 2) for k, v in usage_percentages.items()}, "available_resources": { "vms": quotas["max_vms"] - current_usage["vms"], "memory_gb": quotas["max_memory_gb"] - (current_usage["memory_mb"] / 1024), "cpus": quotas["max_cpus"] - current_usage["cpus"], "disk_gb": quotas["max_disk_gb"] - current_usage["disk_gb"] }, "warnings": warnings } except Exception as e: logger.error(f"Error getting resource quotas: {e}") return {"status": "error", "error": str(e)} async def optimize_resource_allocation(self, user_id: str | None = None) -> dict[str, Any]: """Suggest resource optimization.""" try: range_info = await self.client.get_range(user_id) vms = range_info.get("VMs", []) optimizations = [] total_savings = {"memory_mb": 0, "cpus": 0, "disk_gb": 0} for vm in vms: vm_name = vm.get("name", "unknown") memory = vm.get("memory", 0) cpus = vm.get("cpus", 0) disk = vm.get("disk", 0) # Memory optimization if memory > 8192: savings = memory - 8192 optimizations.append({ "vm": vm_name, "resource": "memory", "current": memory, "suggested": 8192, "savings_mb": savings, "reason": "High memory allocation, consider reducing" }) total_savings["memory_mb"] += savings # CPU optimization if cpus > 4: savings = cpus - 4 optimizations.append({ "vm": vm_name, "resource": "cpu", "current": cpus, "suggested": 4, "savings_cpus": savings, "reason": "High CPU allocation, most workloads use 2-4 cores" }) total_savings["cpus"] += savings # Disk optimization if disk > 100: savings = disk - 80 optimizations.append({ "vm": vm_name, "resource": "disk", "current": disk, "suggested": 80, "savings_gb": savings, "reason": "Large disk allocation, review actual usage" }) total_savings["disk_gb"] += savings return { "status": "success", "timestamp": datetime.now().isoformat(), "optimizations": optimizations, "total_savings": total_savings, "potential_cost_savings": { "monthly_usd": round( (total_savings["memory_mb"] / 1024 * 0.01 * 24 * 30) + (total_savings["cpus"] * 0.05 * 24 * 30) + (total_savings["disk_gb"] * 0.10), 2 ) }, "summary": f"{len(optimizations)} optimization opportunities found" } except Exception as e: logger.error(f"Error optimizing resource allocation: {e}") return {"status": "error", "error": str(e)} async def schedule_maintenance_window( self, start_time: str, duration_hours: int, description: str, user_id: str | None = None ) -> dict[str, Any]: """Plan maintenance with minimal disruption.""" try: range_info = await self.client.get_range(user_id) # Parse start time try: start_dt = datetime.fromisoformat(start_time) except Exception: return { "status": "error", "error": "Invalid start_time format. Use ISO format: YYYY-MM-DDTHH:MM:SS" } end_dt = start_dt + timedelta(hours=duration_hours) maintenance_window = { "start_time": start_dt.isoformat(), "end_time": end_dt.isoformat(), "duration_hours": duration_hours, "description": description, "range_state": range_info.get("rangeState"), "pre_maintenance_tasks": [ {"task": "Create snapshots of all VMs", "tool": "ludus.snapshot_host"}, {"task": "Notify users of downtime", "tool": "communication"}, {"task": "Document current configuration", "tool": "ludus.export_range_backup"} ], "maintenance_tasks": [ {"task": description, "estimated_time": f"{duration_hours}h"} ], "post_maintenance_tasks": [ {"task": "Verify all VMs are running", "tool": "ludus.get_range"}, {"task": "Test connectivity", "tool": "ludus.test_network_connectivity"}, {"task": "Notify users of completion", "tool": "communication"} ], "rollback_plan": { "trigger": "If maintenance fails", "action": "Use ludus.rollback_snapshot to restore pre-maintenance state" } } return { "status": "success", "timestamp": datetime.now().isoformat(), "maintenance_window": maintenance_window, "affected_vms": range_info.get("numberOfVMs", 0) } except Exception as e: logger.error(f"Error scheduling maintenance window: {e}") return {"status": "error", "error": str(e)} async def bulk_vm_operations( self, operation: str, vm_filter: dict | None = None, user_id: str | None = None ) -> dict[str, Any]: """Bulk operations on multiple VMs.""" try: valid_operations = ["power_on", "power_off", "snapshot", "delete", "restart"] if operation not in valid_operations: return { "status": "error", "error": f"Invalid operation. Must be one of: {valid_operations}" } range_info = await self.client.get_range(user_id) vms = range_info.get("VMs", []) # Apply filters if specified if vm_filter: if "status" in vm_filter: vms = [vm for vm in vms if vm.get("status") == vm_filter["status"]] if "template" in vm_filter: vms = [vm for vm in vms if vm_filter["template"] in vm.get("template", "")] if "name_pattern" in vm_filter: pattern = vm_filter["name_pattern"].lower() vms = [vm for vm in vms if pattern in vm.get("name", "").lower()] # Generate operation plan operations_plan = [] for vm in vms: operations_plan.append({ "vm_name": vm.get("name"), "current_status": vm.get("status"), "operation": operation, "estimated_time_seconds": self._estimate_operation_time(operation) }) total_time = sum(op["estimated_time_seconds"] for op in operations_plan) return { "status": "success", "timestamp": datetime.now().isoformat(), "operation": operation, "filter_applied": vm_filter or "none", "affected_vms": len(operations_plan), "operations_plan": operations_plan, "estimated_total_time": { "seconds": total_time, "minutes": round(total_time / 60, 2) }, "execution_guide": { "sequential": "Execute operations one by one", "parallel": "Use Ansible with async for parallel execution", "tool_mapping": { "power_on": "ludus.power_on_range", "power_off": "ludus.power_off_range", "snapshot": "ludus.snapshot_host (loop through VMs)" } } } except Exception as e: logger.error(f"Error planning bulk VM operations: {e}") return {"status": "error", "error": str(e)} def _estimate_operation_time(self, operation: str) -> int: """Estimate operation time in seconds.""" estimates = { "power_on": 60, "power_off": 30, "snapshot": 120, "delete": 45, "restart": 90 } return estimates.get(operation, 60)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server