Skip to main content
Glama
range_management.py17.1 kB
"""Enhanced range management tools for Ludus MCP server. This module provides tools for managing individual ranges with better identification, naming, and selective deletion capabilities. """ from typing import Any from fastmcp import FastMCP from ludus_mcp.core.client import LudusAPIClient from ludus_mcp.server.handlers.ranges import RangeHandler from ludus_mcp.server.tools.utils import LazyHandlerRegistry, format_tool_response def create_range_management_tools(client: LudusAPIClient) -> FastMCP: """Create enhanced range management tools. Args: client: Ludus API client Returns: FastMCP instance with range management tools registered """ mcp = FastMCP("Range Management") registry = LazyHandlerRegistry(client) # ==================== RANGE LISTING & IDENTIFICATION ==================== @mcp.tool() async def list_all_ranges_detailed() -> dict: """List all ranges with detailed information including identification. This tool provides comprehensive information about all ranges in the system, making it easy to identify which range belongs to which user or purpose. Returns: Detailed list of all ranges with: - User ID (unique identifier) - Range number/name - Range state/status - Last deployment time - VM count - Network configuration Example: # List all ranges to see which ones exist result = await list_all_ranges_detailed() # Output shows: { "total_ranges": 3, "ranges": [ { "user_id": "tjnull", "range_number": "10", "status": "SUCCESS", "vm_count": 5, "last_deployment": "2024-12-11T10:30:00Z", "identifier": "tjnull-range-10" }, ... ] } """ handler = registry.get_handler("range", RangeHandler) # Get all ranges from API all_ranges = await handler.client.list_ranges() detailed_ranges = [] for range_data in all_ranges: # Extract detailed information user_id = range_data.get("userID", "unknown") range_number = range_data.get("rangeNumber", "") range_state = range_data.get("rangeState", "UNKNOWN") last_deployment = range_data.get("lastDeployment") # Get VM count if available vms = range_data.get("VMs", []) vm_count = len(vms) if isinstance(vms, list) else 0 # Get network info if available networks = range_data.get("networks", []) network_count = len(networks) if isinstance(networks, list) else 0 # Create a unique identifier identifier = f"{user_id}-range-{range_number}" if range_number else user_id detailed_ranges.append({ "user_id": user_id, "range_number": range_number, "status": range_state, "vm_count": vm_count, "network_count": network_count, "last_deployment": last_deployment, "identifier": identifier, "raw_data": range_data # Include raw data for debugging }) return { "total_ranges": len(detailed_ranges), "ranges": detailed_ranges, "summary": { "active": sum(1 for r in detailed_ranges if r["status"] == "SUCCESS"), "deploying": sum(1 for r in detailed_ranges if "DEPLOY" in r["status"]), "error": sum(1 for r in detailed_ranges if "ERROR" in r["status"] or "FAIL" in r["status"]), } } @mcp.tool() async def get_range_by_user(user_id: str) -> dict: """Get detailed information about a specific user's range. Args: user_id: User ID whose range to retrieve Returns: Detailed range information for the specified user Example: # Get range information for user 'tjnull' result = await get_range_by_user(user_id="tjnull") """ handler = registry.get_handler("range", RangeHandler) # Get range for specific user range_data = await handler.client.get_range(user_id) # Extract detailed information vms = range_data.get("VMs", []) networks = range_data.get("networks", []) return { "user_id": user_id, "range_number": range_data.get("rangeNumber"), "status": range_data.get("rangeState"), "last_deployment": range_data.get("lastDeployment"), "vm_count": len(vms) if isinstance(vms, list) else 0, "network_count": len(networks) if isinstance(networks, list) else 0, "vms": [ { "name": vm.get("name"), "hostname": vm.get("hostname"), "template": vm.get("template"), "status": vm.get("status"), "ip": vm.get("ip"), } for vm in (vms if isinstance(vms, list) else []) ], "networks": networks, "full_data": range_data } @mcp.tool() async def find_range_by_vm_name(vm_name: str) -> dict: """Find which range contains a specific VM by name. Args: vm_name: Name of the VM to search for Returns: Range information containing the VM, or None if not found Example: # Find which range contains the VM 'DC01' result = await find_range_by_vm_name(vm_name="DC01") """ handler = registry.get_handler("range", RangeHandler) # Get all ranges all_ranges = await handler.client.list_ranges() for range_data in all_ranges: vms = range_data.get("VMs", []) if isinstance(vms, list): for vm in vms: if vm_name.lower() in vm.get("name", "").lower() or \ vm_name.lower() in vm.get("hostname", "").lower(): return { "found": True, "user_id": range_data.get("userID"), "range_number": range_data.get("rangeNumber"), "vm_details": vm, "range_status": range_data.get("rangeState"), } return { "found": False, "message": f"No range found containing VM '{vm_name}'" } # ==================== SELECTIVE RANGE DELETION ==================== @mcp.tool() async def delete_range_by_user( user_id: str, confirm: bool = False ) -> dict: """Delete a specific user's range (requires confirmation). This tool allows you to selectively delete a single range by user ID, rather than destroying all ranges. Requires explicit confirmation. Args: user_id: User ID whose range to delete confirm: Must be set to True to actually delete (safety measure) Returns: Deletion result with confirmation details Example: # First, check what you're deleting range_info = await get_range_by_user(user_id="testuser") # Then confirm deletion result = await delete_range_by_user( user_id="testuser", confirm=True ) Safety: - Requires confirm=True to prevent accidental deletion - Shows range details before deletion - Cannot be undone """ if not confirm: # Show what would be deleted without actually deleting handler = registry.get_handler("range", RangeHandler) try: range_data = await handler.client.get_range(user_id) vms = range_data.get("VMs", []) return { "status": "preview", "message": f"[WARNING] DELETION PREVIEW - Set confirm=True to delete", "user_id": user_id, "range_number": range_data.get("rangeNumber"), "range_status": range_data.get("rangeState"), "vm_count": len(vms) if isinstance(vms, list) else 0, "vm_names": [vm.get("name") for vm in (vms if isinstance(vms, list) else [])], "warning": "This range will be PERMANENTLY DELETED if you set confirm=True", "next_step": f"To delete, call: delete_range_by_user(user_id='{user_id}', confirm=True)" } except Exception as e: return { "status": "error", "message": f"Range for user '{user_id}' not found or error: {e}" } # Actually delete the range handler = registry.get_handler("range", RangeHandler) # Get range info before deletion for confirmation try: range_data = await handler.client.get_range(user_id) range_number = range_data.get("rangeNumber") vm_count = len(range_data.get("VMs", [])) except Exception: range_number = "unknown" vm_count = 0 # Perform deletion result = await handler.client.delete_range(user_id) return { "status": "deleted", "message": f"[OK] Successfully deleted range for user '{user_id}'", "user_id": user_id, "range_number": range_number, "vms_removed": vm_count, "deletion_result": result } @mcp.tool() async def delete_ranges_by_status( status_filter: str, confirm: bool = False ) -> dict: """Delete all ranges matching a specific status (requires confirmation). Useful for cleaning up failed deployments or test ranges. Args: status_filter: Status to filter by (e.g., "ERROR", "FAILED", "DEPLOYING") confirm: Must be set to True to actually delete (safety measure) Returns: Deletion results for matching ranges Example: # Delete all ranges with ERROR status result = await delete_ranges_by_status( status_filter="ERROR", confirm=True ) Safety: - Requires confirm=True to prevent accidental deletion - Shows preview of what will be deleted - Cannot be undone """ handler = registry.get_handler("range", RangeHandler) # Get all ranges all_ranges = await handler.client.list_ranges() # Filter by status matching_ranges = [ r for r in all_ranges if status_filter.upper() in r.get("rangeState", "").upper() ] if not confirm: # Preview mode return { "status": "preview", "message": f"[WARNING] DELETION PREVIEW - Set confirm=True to delete", "status_filter": status_filter, "matching_ranges": len(matching_ranges), "ranges_to_delete": [ { "user_id": r.get("userID"), "range_number": r.get("rangeNumber"), "status": r.get("rangeState"), "vm_count": len(r.get("VMs", [])) } for r in matching_ranges ], "warning": f"These {len(matching_ranges)} ranges will be PERMANENTLY DELETED if you set confirm=True", "next_step": f"To delete, call: delete_ranges_by_status(status_filter='{status_filter}', confirm=True)" } # Actually delete matching ranges deletion_results = [] for range_data in matching_ranges: user_id = range_data.get("userID") try: result = await handler.client.delete_range(user_id) deletion_results.append({ "user_id": user_id, "status": "deleted", "result": result }) except Exception as e: deletion_results.append({ "user_id": user_id, "status": "error", "error": str(e) }) successful = sum(1 for r in deletion_results if r["status"] == "deleted") failed = sum(1 for r in deletion_results if r["status"] == "error") return { "status": "completed", "message": f"[OK] Deleted {successful} ranges, {failed} failed", "status_filter": status_filter, "total_processed": len(deletion_results), "successful_deletions": successful, "failed_deletions": failed, "results": deletion_results } @mcp.tool() async def cleanup_old_ranges( keep_user_ids: list[str], confirm: bool = False ) -> dict: """Delete all ranges EXCEPT those belonging to specified users. Useful for cleaning up test/temporary ranges while keeping production ones. Args: keep_user_ids: List of user IDs whose ranges should be KEPT (not deleted) confirm: Must be set to True to actually delete (safety measure) Returns: Deletion results Example: # Keep ranges for 'admin' and 'tjnull', delete all others result = await cleanup_old_ranges( keep_user_ids=["admin", "tjnull"], confirm=True ) Safety: - Requires confirm=True to prevent accidental deletion - Shows preview of what will be deleted - Explicitly protects specified user ranges """ handler = registry.get_handler("range", RangeHandler) # Get all ranges all_ranges = await handler.client.list_ranges() # Filter: delete everything NOT in keep list ranges_to_delete = [ r for r in all_ranges if r.get("userID") not in keep_user_ids ] ranges_to_keep = [ r for r in all_ranges if r.get("userID") in keep_user_ids ] if not confirm: # Preview mode return { "status": "preview", "message": f"[WARNING] DELETION PREVIEW - Set confirm=True to delete", "keep_user_ids": keep_user_ids, "ranges_to_keep": len(ranges_to_keep), "ranges_to_delete": len(ranges_to_delete), "kept_ranges": [ { "user_id": r.get("userID"), "range_number": r.get("rangeNumber"), "status": r.get("rangeState"), } for r in ranges_to_keep ], "deleted_ranges": [ { "user_id": r.get("userID"), "range_number": r.get("rangeNumber"), "status": r.get("rangeState"), "vm_count": len(r.get("VMs", [])) } for r in ranges_to_delete ], "warning": f"These {len(ranges_to_delete)} ranges will be PERMANENTLY DELETED if you set confirm=True", "next_step": f"To delete, call: cleanup_old_ranges(keep_user_ids={keep_user_ids}, confirm=True)" } # Actually delete non-kept ranges deletion_results = [] for range_data in ranges_to_delete: user_id = range_data.get("userID") try: result = await handler.client.delete_range(user_id) deletion_results.append({ "user_id": user_id, "status": "deleted", "result": result }) except Exception as e: deletion_results.append({ "user_id": user_id, "status": "error", "error": str(e) }) successful = sum(1 for r in deletion_results if r["status"] == "deleted") failed = sum(1 for r in deletion_results if r["status"] == "error") return { "status": "completed", "message": f"[OK] Deleted {successful} ranges, kept {len(ranges_to_keep)} ranges", "kept_user_ids": keep_user_ids, "ranges_kept": len(ranges_to_keep), "ranges_deleted": successful, "ranges_failed": failed, "results": deletion_results } return mcp

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server