Skip to main content
Glama
infrastructure_crud.py64.5 kB
"""Infrastructure CRUD operations for complete network management.""" import json import uuid from datetime import datetime from typing import Any import asyncssh from .sitemap import NetworkSiteMap class InfrastructureManager: """Manages CRUD operations for infrastructure components.""" def __init__(self) -> None: self.sitemap = NetworkSiteMap() async def get_device_connection_info(self, device_id: int) -> dict[str, Any] | None: """Get SSH connection info for a device from the sitemap.""" devices = self.sitemap.get_all_devices() for device in devices: if device.get("id") == device_id: return { "hostname": device.get("connection_ip", device.get("hostname")), "username": "mcp_admin", # Use the admin account we set up "port": 22, } return None async def deploy_infrastructure_plan( deployment_plan: dict[str, Any], validate_only: bool = False ) -> str: """Deploy new infrastructure based on AI recommendations or user specifications.""" try: manager = InfrastructureManager() # Validate the deployment plan validation_result = await _validate_deployment_plan(deployment_plan) if not validation_result["valid"]: return json.dumps( { "status": "error", "message": f"Deployment plan validation failed: {validation_result['errors']}", } ) if validate_only: return json.dumps( { "status": "success", "message": "Deployment plan validation passed", "validation_result": validation_result, "estimated_duration": "15-30 minutes", "affected_devices": len( { service.get("target_device_id") for service in deployment_plan.get("services", []) } ), } ) # Execute deployment plan deployment_results = [] # Deploy services for service in deployment_plan.get("services", []): result = await _deploy_service(manager, service) deployment_results.append(result) # Apply network changes for network_change in deployment_plan.get("network_changes", []): result = await _apply_network_change(manager, network_change) deployment_results.append(result) # Update sitemap with new infrastructure await _update_sitemap_after_deployment(manager, deployment_results) successful_deployments = [ r for r in deployment_results if r.get("status") == "success" ] failed_deployments = [ r for r in deployment_results if r.get("status") == "error" ] return json.dumps( { "status": "success" if len(failed_deployments) == 0 else "partial_success", "message": f"Deployed {len(successful_deployments)} components successfully", "successful_deployments": len(successful_deployments), "failed_deployments": len(failed_deployments), "deployment_results": deployment_results, "next_steps": [ "Verify services are running correctly", "Update DNS/load balancer configurations if needed", "Monitor resource usage for optimization opportunities", ], }, indent=2, ) except Exception as e: return json.dumps( { "status": "error", "message": f"Infrastructure deployment failed: {str(e)}", } ) async def update_device_configuration( device_id: int, config_changes: dict[str, Any], backup_before_change: bool = True, validate_only: bool = False, ) -> str: """Update configuration of an existing device.""" try: manager = InfrastructureManager() # Get device connection info connection_info = await manager.get_device_connection_info(device_id) if not connection_info: return json.dumps( { "status": "error", "message": f"Device with ID {device_id} not found in sitemap", } ) # Validate configuration changes validation_result = await _validate_config_changes(config_changes, device_id) if not validation_result["valid"]: return json.dumps( { "status": "error", "message": f"Configuration validation failed: {validation_result['errors']}", } ) if validate_only: return json.dumps( { "status": "success", "message": "Configuration changes validated successfully", "validation_result": validation_result, "affected_services": validation_result.get("affected_services", []), "estimated_downtime": validation_result.get( "estimated_downtime", "None" ), } ) # Create backup if requested backup_id = None if backup_before_change: backup_result = await create_infrastructure_backup( backup_scope="device_specific", device_ids=[device_id] ) backup_data = json.loads(backup_result) if backup_data.get("status") == "success": backup_id = backup_data.get("backup_id") # Apply configuration changes async with asyncssh.connect( connection_info["hostname"], username=connection_info["username"], known_hosts=None, ) as conn: change_results = [] # Apply service configuration changes if "services" in config_changes: for service_name, service_config in config_changes["services"].items(): result = await _update_service_config( conn, service_name, service_config ) change_results.append(result) # Apply network configuration changes if "network" in config_changes: result = await _update_network_config(conn, config_changes["network"]) change_results.append(result) # Apply security configuration changes if "security" in config_changes: result = await _update_security_config(conn, config_changes["security"]) change_results.append(result) # Apply resource allocation changes if "resources" in config_changes: result = await _update_resource_config( conn, config_changes["resources"] ) change_results.append(result) # Update sitemap with changes await _rediscover_device_after_changes(manager, device_id, connection_info) successful_changes = [r for r in change_results if r.get("status") == "success"] failed_changes = [r for r in change_results if r.get("status") == "error"] return json.dumps( { "status": "success" if len(failed_changes) == 0 else "partial_success", "message": f"Applied {len(successful_changes)} configuration changes", "device_id": device_id, "backup_id": backup_id, "successful_changes": len(successful_changes), "failed_changes": len(failed_changes), "change_results": change_results, }, indent=2, ) except Exception as e: return json.dumps( { "status": "error", "message": f"Device configuration update failed: {str(e)}", } ) async def decommission_network_device( device_id: int, migration_plan: dict[str, Any] | None = None, force_removal: bool = False, validate_only: bool = False, ) -> str: """Safely remove a device from the network infrastructure.""" try: manager = InfrastructureManager() # Get device info connection_info = await manager.get_device_connection_info(device_id) if not connection_info: return json.dumps( { "status": "error", "message": f"Device with ID {device_id} not found in sitemap", } ) # Analyze device dependencies dependencies = await _analyze_device_dependencies(manager, device_id) if ( dependencies["critical_services"] and not migration_plan and not force_removal ): return json.dumps( { "status": "error", "message": "Device has critical services. Migration plan required.", "critical_services": dependencies["critical_services"], "dependent_devices": dependencies["dependent_devices"], } ) if validate_only: return json.dumps( { "status": "success", "message": "Decommission plan validated", "dependencies": dependencies, "migration_required": len(dependencies["critical_services"]) > 0, "estimated_migration_time": "30-60 minutes" if migration_plan else "N/A", } ) decommission_results = [] # Execute migration plan if provided if migration_plan and not force_removal: migration_results = await _execute_migration_plan( manager, device_id, migration_plan ) decommission_results.extend(migration_results) # Remove device from active service async with asyncssh.connect( connection_info["hostname"], username=connection_info["username"], known_hosts=None, ) as conn: # Stop all services stop_result = await _stop_all_device_services(conn) decommission_results.append(stop_result) # Remove from load balancers/clusters removal_result = await _remove_from_clusters(conn) decommission_results.append(removal_result) # Update sitemap to mark device as decommissioned # Note: This method doesn't exist in NetworkSiteMap, would need to be implemented # manager.sitemap.update_device_status(device_id, "decommissioned") return json.dumps( { "status": "success", "message": f"Device {device_id} successfully decommissioned", "device_id": device_id, "migration_executed": migration_plan is not None, "decommission_results": decommission_results, "next_steps": [ "Verify migrated services are running on target devices", "Update monitoring and alerting configurations", "Physically remove or repurpose the hardware", ], }, indent=2, ) except Exception as e: return json.dumps( {"status": "error", "message": f"Device decommissioning failed: {str(e)}"} ) async def scale_infrastructure_services( scaling_plan: dict[str, Any], validate_only: bool = False ) -> str: """Scale services up or down based on resource analysis.""" try: manager = InfrastructureManager() # Validate scaling plan validation_result = await _validate_scaling_plan(scaling_plan) if not validation_result["valid"]: return json.dumps( { "status": "error", "message": f"Scaling plan validation failed: {validation_result['errors']}", } ) if validate_only: return json.dumps( { "status": "success", "message": "Scaling plan validated successfully", "validation_result": validation_result, "estimated_duration": "10-20 minutes", "resource_impact": validation_result.get("resource_impact", {}), } ) scaling_results = [] # Execute scale-up operations for scale_up in scaling_plan.get("scale_up", []): result = await _scale_service_up(manager, scale_up) scaling_results.append(result) # Execute scale-down operations for scale_down in scaling_plan.get("scale_down", []): result = await _scale_service_down(manager, scale_down) scaling_results.append(result) successful_scaling = [ r for r in scaling_results if r.get("status") == "success" ] failed_scaling = [r for r in scaling_results if r.get("status") == "error"] return json.dumps( { "status": "success" if len(failed_scaling) == 0 else "partial_success", "message": f"Completed {len(successful_scaling)} scaling operations", "successful_operations": len(successful_scaling), "failed_operations": len(failed_scaling), "scaling_results": scaling_results, }, indent=2, ) except Exception as e: return json.dumps( {"status": "error", "message": f"Service scaling failed: {str(e)}"} ) async def validate_infrastructure_plan( change_plan: dict[str, Any], validation_level: str = "comprehensive" ) -> str: """Validate infrastructure changes before applying them.""" try: validation_results: dict[str, list[dict[str, Any]]] = { "basic": [], "comprehensive": [], "simulation": [], } # Basic validation basic_checks = await _perform_basic_validation(change_plan) validation_results["basic"] = basic_checks # Comprehensive validation if validation_level in ["comprehensive", "simulation"]: comprehensive_checks = await _perform_comprehensive_validation(change_plan) validation_results["comprehensive"] = comprehensive_checks # Simulation validation if validation_level == "simulation": simulation_results = await _perform_simulation_validation(change_plan) validation_results["simulation"] = simulation_results all_checks = [] for _level, checks in validation_results.items(): if checks: all_checks.extend(checks) failed_checks = [ check for check in all_checks if not check.get("passed", False) ] warning_checks = [check for check in all_checks if check.get("warning", False)] return json.dumps( { "status": "success", "validation_level": validation_level, "overall_result": "passed" if len(failed_checks) == 0 else "failed", "total_checks": len(all_checks), "passed_checks": len(all_checks) - len(failed_checks), "failed_checks": len(failed_checks), "warning_checks": len(warning_checks), "validation_results": validation_results, "recommendations": _generate_validation_recommendations(all_checks), }, indent=2, ) except Exception as e: return json.dumps( { "status": "error", "message": f"Infrastructure validation failed: {str(e)}", } ) async def create_infrastructure_backup( backup_scope: str = "full", device_ids: list[int] | None = None, include_data: bool = False, backup_name: str | None = None, ) -> str: """Create a backup of current infrastructure state.""" try: manager = InfrastructureManager() # Generate backup ID backup_id = ( backup_name or f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{str(uuid.uuid4())[:8]}" ) backup_data = { "backup_id": backup_id, "created_at": datetime.now().isoformat(), "scope": backup_scope, "include_data": include_data, "devices": {}, "network_topology": {}, "services": {}, } # Determine which devices to backup if backup_scope == "full": devices = manager.sitemap.get_all_devices() target_device_ids = [device["id"] for device in devices] elif device_ids: target_device_ids = device_ids else: return json.dumps( { "status": "error", "message": "Device IDs required for partial or device-specific backup", } ) # Backup each device for device_id in target_device_ids: device_backup = await _backup_device(manager, device_id, include_data) if isinstance(backup_data["devices"], dict): backup_data["devices"][str(device_id)] = device_backup # Backup network topology backup_data["network_topology"] = await _backup_network_topology(manager) # Save backup (in a real implementation, this would go to persistent storage) backup_path = f"/tmp/infrastructure_backup_{backup_id}.json" with open(backup_path, "w") as f: json.dump(backup_data, f, indent=2) return json.dumps( { "status": "success", "message": "Infrastructure backup created successfully", "backup_id": backup_id, "backup_path": backup_path, "scope": backup_scope, "devices_backed_up": len(target_device_ids), "backup_size_mb": round(len(json.dumps(backup_data)) / 1024 / 1024, 2), "include_data": include_data, }, indent=2, ) except Exception as e: return json.dumps( {"status": "error", "message": f"Infrastructure backup failed: {str(e)}"} ) async def rollback_infrastructure_to_backup( backup_id: str, rollback_scope: str = "full", device_ids: list[int] | None = None, validate_only: bool = False, ) -> str: """Rollback recent infrastructure changes.""" try: # Load backup data backup_path = f"/tmp/infrastructure_backup_{backup_id}.json" try: with open(backup_path) as f: backup_data = json.load(f) except FileNotFoundError: return json.dumps( {"status": "error", "message": f"Backup {backup_id} not found"} ) if validate_only: return json.dumps( { "status": "success", "message": "Rollback plan validated", "backup_id": backup_id, "backup_created": backup_data.get("created_at"), "rollback_scope": rollback_scope, "devices_to_rollback": len(device_ids) if device_ids else len(backup_data.get("devices", {})), "estimated_duration": "20-45 minutes", } ) manager = InfrastructureManager() rollback_results = [] # Determine which devices to rollback if rollback_scope == "full": target_device_ids = list(backup_data.get("devices", {}).keys()) elif device_ids: target_device_ids = [str(device_id) for device_id in device_ids] else: return json.dumps( { "status": "error", "message": "Device IDs required for partial or device-specific rollback", } ) # Rollback each device for device_id in target_device_ids: devices_data = backup_data.get("devices", {}) if isinstance(devices_data, dict) and device_id in devices_data: result = await _rollback_device( manager, int(device_id), devices_data[device_id] ) rollback_results.append(result) successful_rollbacks = [ r for r in rollback_results if r.get("status") == "success" ] failed_rollbacks = [r for r in rollback_results if r.get("status") == "error"] return json.dumps( { "status": "success" if len(failed_rollbacks) == 0 else "partial_success", "message": f"Rolled back {len(successful_rollbacks)} devices successfully", "backup_id": backup_id, "rollback_scope": rollback_scope, "successful_rollbacks": len(successful_rollbacks), "failed_rollbacks": len(failed_rollbacks), "rollback_results": rollback_results, }, indent=2, ) except Exception as e: return json.dumps( {"status": "error", "message": f"Infrastructure rollback failed: {str(e)}"} ) # Helper functions (simplified implementations) async def _validate_deployment_plan(plan: dict[str, Any]) -> dict[str, Any]: """Validate a deployment plan.""" errors = [] warnings = [] # Check required fields if "services" not in plan and "network_changes" not in plan: errors.append("Deployment plan must include either services or network_changes") # Validate services if "services" in plan: for i, service in enumerate(plan["services"]): service_errors = [] # Required fields if "name" not in service: service_errors.append(f"Service {i}: 'name' is required") if "type" not in service: service_errors.append(f"Service {i}: 'type' is required") elif service["type"] not in ["docker", "lxd", "service"]: service_errors.append( f"Service {i}: type must be 'docker', 'lxd', or 'service'" ) if "target_device_id" not in service: service_errors.append(f"Service {i}: 'target_device_id' is required") # Docker-specific validation if service.get("type") == "docker": config = service.get("config", {}) if "image" not in config: warnings.append( f"Service {i}: Docker image not specified, will use default" ) # Validate port mappings ports = config.get("ports", []) for port in ports: if ":" not in str(port): service_errors.append( f"Service {i}: Invalid port mapping '{port}' (expected 'host:container')" ) # Service-specific validation elif service.get("type") == "service": config = service.get("config", {}) if "service_file" not in config: service_errors.append( f"Service {i}: 'service_file' is required for systemd services" ) errors.extend(service_errors) # Validate network changes if "network_changes" in plan: for i, change in enumerate(plan["network_changes"]): if "action" not in change: errors.append(f"Network change {i}: 'action' is required") elif change["action"] not in [ "create_vlan", "configure_firewall", "setup_routing", ]: errors.append( f"Network change {i}: Invalid action '{change['action']}'" ) if "target_device_id" not in change: errors.append(f"Network change {i}: 'target_device_id' is required") return {"valid": len(errors) == 0, "errors": errors, "warnings": warnings} async def _deploy_service( manager: InfrastructureManager, service: dict[str, Any] ) -> dict[str, Any]: """Deploy a single service.""" try: device_id = service["target_device_id"] connection_info = await manager.get_device_connection_info(device_id) if not connection_info: return { "status": "error", "service": service["name"], "error": f"Device {device_id} not found", } async with asyncssh.connect( connection_info["hostname"], username=connection_info["username"], known_hosts=None, ) as conn: service_type = service["type"] service_name = service["name"] config = service.get("config", {}) if service_type == "docker": # Deploy Docker container docker_image = config.get("image", "nginx:latest") ports = config.get("ports", []) volumes = config.get("volumes", []) env_vars = config.get("environment", {}) # Build docker run command cmd_parts = ["docker", "run", "-d", "--name", service_name] # Add port mappings for port_mapping in ports: cmd_parts.extend(["-p", port_mapping]) # Add volume mounts for volume in volumes: cmd_parts.extend(["-v", volume]) # Add environment variables for key, value in env_vars.items(): cmd_parts.extend(["-e", f"{key}={value}"]) cmd_parts.append(docker_image) result = await conn.run(" ".join(cmd_parts)) if result.exit_status == 0: stdout_text = ( result.stdout.decode() if isinstance(result.stdout, bytes) else str(result.stdout) if result.stdout else "" ) return { "status": "success", "service": service_name, "container_id": stdout_text.strip(), } else: return { "status": "error", "service": service_name, "error": result.stderr, } elif service_type == "lxd": # Deploy LXD container lxd_image = config.get("image", "ubuntu:22.04") # Launch LXD container result = await conn.run(f"lxc launch {lxd_image} {service_name}") if result.exit_status == 0: return { "status": "success", "service": service_name, "container": service_name, } else: return { "status": "error", "service": service_name, "error": result.stderr, } elif service_type == "service": # Deploy systemd service service_file = config.get("service_file", "") if not service_file: return { "status": "error", "service": service_name, "error": "Service file content required", } # Write service file await conn.run( f'echo "{service_file}" | sudo tee /etc/systemd/system/{service_name}.service' ) await conn.run("sudo systemctl daemon-reload") await conn.run(f"sudo systemctl enable {service_name}") result = await conn.run(f"sudo systemctl start {service_name}") if result.exit_status == 0: return { "status": "success", "service": service_name, "systemd_service": service_name, } else: return { "status": "error", "service": service_name, "error": result.stderr, } else: return { "status": "error", "service": service_name, "error": f"Unknown service type: {service_type}", } except Exception as e: return { "status": "error", "service": service.get("name", "unknown"), "error": str(e), } async def _apply_network_change( manager: InfrastructureManager, change: dict[str, Any] ) -> dict[str, Any]: """Apply a network configuration change.""" return {"status": "success", "change": change["action"]} async def _update_sitemap_after_deployment( manager: InfrastructureManager, results: list[dict[str, Any]] ) -> None: """Update sitemap after deployment.""" pass async def _validate_config_changes( changes: dict[str, Any], device_id: int ) -> dict[str, Any]: """Validate configuration changes.""" errors = [] warnings = [] affected_services = [] estimated_downtime = "None" # Validate services changes if "services" in changes: for service_name, service_config in changes["services"].items(): affected_services.append(service_name) if "type" in service_config: if service_config["type"] not in ["docker", "lxd", "systemd"]: errors.append( f"Invalid service type '{service_config['type']}' for {service_name}" ) # Docker validation if service_config.get("type") == "docker": if "image" in service_config: estimated_downtime = "2-5 minutes" if "ports" in service_config: for port in service_config["ports"]: if ":" not in str(port): errors.append( f"Invalid port mapping '{port}' for {service_name}" ) # Validate network changes if "network" in changes: warnings.append("Network changes may affect connectivity") if estimated_downtime == "None": estimated_downtime = "1-2 minutes" # Validate security changes if "security" in changes: warnings.append("Security changes may affect service access") if "firewall" in changes["security"]: warnings.append("Firewall changes may block existing connections") # Validate resource changes if "resources" in changes: if "memory" in changes["resources"]: memory_change = changes["resources"]["memory"] if isinstance(memory_change, str) and memory_change.endswith("G"): try: memory_gb = float(memory_change[:-1]) if memory_gb > 32: warnings.append(f"High memory allocation: {memory_change}") except ValueError: errors.append(f"Invalid memory format: {memory_change}") return { "valid": len(errors) == 0, "errors": errors, "warnings": warnings, "affected_services": affected_services, "estimated_downtime": estimated_downtime, } async def _update_service_config( conn: Any, service_name: str, config: dict[str, Any] ) -> dict[str, Any]: """Update service configuration.""" try: service_type = config.get("type", "docker") if service_type == "docker": # Update Docker container configuration # Check if container exists result = await conn.run(f"docker inspect {service_name}") if result.exit_status != 0: return { "status": "error", "service": service_name, "error": "Container not found", } # Stop existing container await conn.run(f"docker stop {service_name}") await conn.run(f"docker rm {service_name}") # Recreate with new configuration docker_image = config.get("image", "nginx:latest") ports = config.get("ports", []) volumes = config.get("volumes", []) env_vars = config.get("environment", {}) cmd_parts = ["docker", "run", "-d", "--name", service_name] for port_mapping in ports: cmd_parts.extend(["-p", port_mapping]) for volume in volumes: cmd_parts.extend(["-v", volume]) for key, value in env_vars.items(): cmd_parts.extend(["-e", f"{key}={value}"]) cmd_parts.append(docker_image) result = await conn.run(" ".join(cmd_parts)) if result.exit_status == 0: return { "status": "success", "service": service_name, "action": "updated", } else: return { "status": "error", "service": service_name, "error": result.stderr, } elif service_type == "systemd": # Update systemd service configuration service_file = config.get("service_file", "") if service_file: await conn.run( f'echo "{service_file}" | sudo tee /etc/systemd/system/{service_name}.service' ) await conn.run("sudo systemctl daemon-reload") result = await conn.run(f"sudo systemctl restart {service_name}") if result.exit_status == 0: return { "status": "success", "service": service_name, "action": "updated", } else: return { "status": "error", "service": service_name, "error": result.stderr, } # Update service state if "enabled" in config: if config["enabled"]: await conn.run(f"sudo systemctl enable {service_name}") else: await conn.run(f"sudo systemctl disable {service_name}") if "running" in config: if config["running"]: result = await conn.run(f"sudo systemctl start {service_name}") else: result = await conn.run(f"sudo systemctl stop {service_name}") if result.exit_status == 0: return { "status": "success", "service": service_name, "action": "state_updated", } else: return { "status": "error", "service": service_name, "error": result.stderr, } return { "status": "success", "service": service_name, "action": "config_updated", } else: return { "status": "error", "service": service_name, "error": f"Unknown service type: {service_type}", } except Exception as e: return {"status": "error", "service": service_name, "error": str(e)} async def _update_network_config(conn: Any, config: dict[str, Any]) -> dict[str, Any]: """Update network configuration.""" return {"status": "success", "component": "network"} async def _update_security_config(conn: Any, config: dict[str, Any]) -> dict[str, Any]: """Update security configuration.""" return {"status": "success", "component": "security"} async def _update_resource_config(conn: Any, config: dict[str, Any]) -> dict[str, Any]: """Update resource configuration.""" return {"status": "success", "component": "resources"} async def _rediscover_device_after_changes( manager: InfrastructureManager, device_id: int, connection_info: dict[str, Any] ) -> None: """Rediscover device after configuration changes.""" pass async def _analyze_device_dependencies( manager: InfrastructureManager, device_id: int ) -> dict[str, Any]: """Analyze device dependencies.""" try: connection_info = await manager.get_device_connection_info(device_id) if not connection_info: return { "critical_services": [], "dependent_devices": [], "error": "Device not found", } critical_services = [] dependent_devices = [] async with asyncssh.connect( connection_info["hostname"], username=connection_info["username"], known_hosts=None, ) as conn: # Check for running Docker containers docker_result = await conn.run('docker ps --format "{{.Names}}"') if docker_result.exit_status == 0 and docker_result.stdout: stdout_text = ( docker_result.stdout.decode() if isinstance(docker_result.stdout, bytes) else str(docker_result.stdout) ) if stdout_text.strip(): container_names = stdout_text.strip().split("\n") for container_name in container_names: if container_name.strip(): # Check if container has exposed ports (likely critical) port_result = await conn.run(f"docker port {container_name}") if port_result.exit_status == 0 and port_result.stdout: stdout_text = ( port_result.stdout.decode() if isinstance(port_result.stdout, bytes) else str(port_result.stdout) ) if stdout_text.strip(): critical_services.append( { "name": container_name, "type": "docker", "reason": "Has exposed ports - likely provides external services", "ports": stdout_text.strip().split("\n"), } ) # Check for running LXD containers lxd_result = await conn.run("lxc list --format csv -c ns | grep RUNNING") if lxd_result.exit_status == 0 and lxd_result.stdout: stdout_text = ( lxd_result.stdout.decode() if isinstance(lxd_result.stdout, bytes) else str(lxd_result.stdout) ) if stdout_text.strip(): for line in stdout_text.strip().split("\n"): if line.strip(): container_name = line.split(",")[0] critical_services.append( { "name": container_name, "type": "lxd", "reason": "Running LXD container", } ) # Check for critical systemd services critical_service_patterns = [ "nginx", "apache2", "mysql", "postgresql", "redis", "mongodb", "docker", "k3s", "kubernetes", "prometheus", "grafana", ] for pattern in critical_service_patterns: service_result = await conn.run( f"systemctl is-active {pattern} 2>/dev/null" ) if service_result.exit_status == 0 and service_result.stdout: stdout_text = ( service_result.stdout.decode() if isinstance(service_result.stdout, bytes) else str(service_result.stdout) ) if stdout_text.strip() == "active": critical_services.append( { "name": pattern, "type": "systemd", "reason": "Critical infrastructure service", } ) # Check for services listening on network ports netstat_result = await conn.run("ss -tlnp 2>/dev/null | grep LISTEN") if netstat_result.exit_status == 0 and netstat_result.stdout: listening_ports: list[str] = [] stdout_text = ( netstat_result.stdout.decode() if isinstance(netstat_result.stdout, bytes) else str(netstat_result.stdout) ) for line in stdout_text.strip().split("\n"): if "LISTEN" in line: parts = line.split() if len(parts) >= 4: addr_port = parts[3] if ":" in addr_port: port = addr_port.split(":")[-1] if port not in ["22", "53"]: # Skip SSH and DNS listening_ports.append(port) if listening_ports: critical_services.append( { "name": "network_services", "type": "network", "reason": f"Listening on ports: {', '.join(listening_ports)}", "ports": listening_ports, } ) # Analyze network dependencies (simplified) # In a real implementation, this would check the network topology # and identify devices that depend on this device for routing, DNS, etc. all_devices = manager.sitemap.get_all_devices() device_ip = connection_info["hostname"] for device in all_devices: if device.get("id") != device_id: # Check if this device might be a gateway or DNS server for others device_subnet = ".".join(device_ip.split(".")[:-1]) other_ip = device.get("connection_ip", device.get("hostname", "")) if other_ip.startswith(device_subnet): # Devices in same subnet might depend on this device dependent_devices.append( { "device_id": device.get("id"), "hostname": device.get("hostname"), "reason": "Same network subnet - potential dependency", } ) return { "critical_services": critical_services, "dependent_devices": dependent_devices, "analysis_summary": { "total_critical_services": len(critical_services), "total_dependent_devices": len(dependent_devices), "migration_complexity": "high" if len(critical_services) > 3 else "medium" if len(critical_services) > 0 else "low", }, } except Exception as e: return {"critical_services": [], "dependent_devices": [], "error": str(e)} async def _execute_migration_plan( manager: InfrastructureManager, device_id: int, plan: dict[str, Any] ) -> list[dict[str, Any]]: """Execute service migration plan.""" results = [] try: source_connection_info = await manager.get_device_connection_info(device_id) if not source_connection_info: return [ {"status": "error", "error": f"Source device {device_id} not found"} ] service_mapping = plan.get("service_mapping", {}) for service_name, target_device_id in service_mapping.items(): try: # Get target device connection info target_connection_info = await manager.get_device_connection_info( target_device_id ) if not target_connection_info: results.append( { "status": "error", "service": service_name, "error": f"Target device {target_device_id} not found", } ) continue # Connect to source device to get service configuration async with asyncssh.connect( source_connection_info["hostname"], username=source_connection_info["username"], known_hosts=None, ) as source_conn: # Get Docker container configuration inspect_result = await source_conn.run( f"docker inspect {service_name}" ) if inspect_result.exit_status == 0: # Export container and configuration await source_conn.run( f"docker commit {service_name} {service_name}_migration" ) save_result = await source_conn.run( f"docker save {service_name}_migration | gzip > /tmp/{service_name}_migration.tar.gz" ) if save_result.exit_status == 0: # Connect to target device async with asyncssh.connect( target_connection_info["hostname"], username=target_connection_info["username"], known_hosts=None, ) as target_conn: # Transfer container image async with ( source_conn.start_sftp_client() as source_sftp ): await source_sftp.get( f"/tmp/{service_name}_migration.tar.gz", f"/tmp/{service_name}_migration.tar.gz", ) async with ( target_conn.start_sftp_client() as target_sftp ): await target_sftp.put( f"/tmp/{service_name}_migration.tar.gz", f"/tmp/{service_name}_migration.tar.gz", ) # Load and start container on target await target_conn.run( f"gunzip -c /tmp/{service_name}_migration.tar.gz | docker load" ) run_result = await target_conn.run( f"docker run -d --name {service_name} {service_name}_migration" ) if run_result.exit_status == 0: # Stop container on source await source_conn.run(f"docker stop {service_name}") await source_conn.run(f"docker rm {service_name}") results.append( { "status": "success", "service": service_name, "migration": f"moved from device {device_id} to device {target_device_id}", } ) else: results.append( { "status": "error", "service": service_name, "error": f"Failed to start on target: {run_result.stderr.decode() if isinstance(run_result.stderr, bytes) else str(run_result.stderr)}", } ) else: results.append( { "status": "error", "service": service_name, "error": "Failed to export container", } ) else: # Try LXD container lxc_result = await source_conn.run(f"lxc info {service_name}") if lxc_result.exit_status == 0: # Copy LXD container copy_result = await source_conn.run( f"lxc copy {service_name} {target_connection_info['hostname']}:{service_name}" ) if copy_result.exit_status == 0: # Start on target and stop on source async with asyncssh.connect( target_connection_info["hostname"], username=target_connection_info["username"], known_hosts=None, ) as target_conn: await target_conn.run(f"lxc start {service_name}") await source_conn.run(f"lxc stop {service_name}") await source_conn.run(f"lxc delete {service_name}") results.append( { "status": "success", "service": service_name, "migration": f"LXD container moved from device {device_id} to device {target_device_id}", } ) else: results.append( { "status": "error", "service": service_name, "error": "Failed to copy LXD container", } ) else: results.append( { "status": "error", "service": service_name, "error": "Service not found (not Docker or LXD)", } ) except Exception as e: results.append( {"status": "error", "service": service_name, "error": str(e)} ) return results except Exception as e: return [ {"status": "error", "error": f"Migration plan execution failed: {str(e)}"} ] async def _stop_all_device_services(conn: Any) -> dict[str, Any]: """Stop all services on a device.""" return {"status": "success", "action": "services_stopped"} async def _remove_from_clusters(conn: Any) -> dict[str, Any]: """Remove device from clusters.""" return {"status": "success", "action": "removed_from_clusters"} async def _validate_scaling_plan(plan: dict[str, Any]) -> dict[str, Any]: """Validate a scaling plan.""" return {"valid": True, "errors": [], "resource_impact": {}} async def _scale_service_up( manager: InfrastructureManager, scale_up: dict[str, Any] ) -> dict[str, Any]: """Scale a service up.""" return { "status": "success", "action": "scale_up", "service": scale_up.get("service_name"), } async def _scale_service_down( manager: InfrastructureManager, scale_down: dict[str, Any] ) -> dict[str, Any]: """Scale a service down.""" return { "status": "success", "action": "scale_down", "service": scale_down.get("service_name"), } async def _perform_basic_validation(plan: dict[str, Any]) -> list[dict[str, Any]]: """Perform basic validation checks.""" return [{"check": "syntax", "passed": True}] async def _perform_comprehensive_validation( plan: dict[str, Any], ) -> list[dict[str, Any]]: """Perform comprehensive validation checks.""" return [{"check": "dependencies", "passed": True}] async def _perform_simulation_validation(plan: dict[str, Any]) -> list[dict[str, Any]]: """Perform simulation validation.""" return [{"check": "simulation", "passed": True}] def _generate_validation_recommendations(checks: list[dict[str, Any]]) -> list[str]: """Generate validation recommendations.""" return ["All validations passed"] async def _backup_device( manager: InfrastructureManager, device_id: int, include_data: bool ) -> dict[str, Any]: """Backup a single device.""" try: connection_info = await manager.get_device_connection_info(device_id) if not connection_info: return { "device_id": device_id, "backed_up": False, "error": "Device not found", } backup_data = { "device_id": device_id, "connection_info": connection_info, "services": {}, "system_config": {}, "network_config": {}, "backed_up_at": datetime.now().isoformat(), } async with asyncssh.connect( connection_info["hostname"], username=connection_info["username"], known_hosts=None, ) as conn: # Backup Docker containers docker_result = await conn.run('docker ps -a --format "{{.Names}}"') if docker_result.exit_status == 0 and docker_result.stdout: stdout_text = ( docker_result.stdout.decode() if isinstance(docker_result.stdout, bytes) else str(docker_result.stdout) ) if stdout_text.strip(): container_names = stdout_text.strip().split("\n") for container_name in container_names: if container_name.strip(): inspect_result = await conn.run( f"docker inspect {container_name}" ) if inspect_result.exit_status == 0: stdout_text = ( inspect_result.stdout.decode() if isinstance(inspect_result.stdout, bytes) else str(inspect_result.stdout) ) if isinstance(backup_data["services"], dict): backup_data["services"][container_name] = { "type": "docker", "config": stdout_text, "backed_up": True, } if include_data: # Export container data export_result = await conn.run( f"docker export {container_name} | gzip > /tmp/backup_{container_name}.tar.gz" ) if ( isinstance(backup_data["services"], dict) and container_name in backup_data["services"] ): backup_data["services"][container_name][ "data_backup" ] = export_result.exit_status == 0 # Backup LXD containers lxd_result = await conn.run("lxc list --format csv -c n") if lxd_result.exit_status == 0 and lxd_result.stdout: stdout_text = ( lxd_result.stdout.decode() if isinstance(lxd_result.stdout, bytes) else str(lxd_result.stdout) ) if stdout_text.strip(): container_names = stdout_text.strip().split("\n") for container_name in container_names: if container_name.strip(): info_result = await conn.run( f"lxc config show {container_name}" ) if info_result.exit_status == 0: stdout_text = ( info_result.stdout.decode() if isinstance(info_result.stdout, bytes) else str(info_result.stdout) ) if isinstance(backup_data["services"], dict): backup_data["services"][container_name] = { "type": "lxd", "config": stdout_text, "backed_up": True, } if include_data: # Export LXD container export_result = await conn.run( f"lxc export {container_name} /tmp/backup_{container_name}.tar.gz" ) if ( isinstance(backup_data["services"], dict) and container_name in backup_data["services"] ): backup_data["services"][container_name][ "data_backup" ] = export_result.exit_status == 0 # Backup systemd services systemd_result = await conn.run( "systemctl list-units --type=service --state=loaded --no-pager --plain | grep -v LOAD" ) if systemd_result.exit_status == 0 and systemd_result.stdout: stdout_text = ( systemd_result.stdout.decode() if isinstance(systemd_result.stdout, bytes) else str(systemd_result.stdout) ) service_lines = stdout_text.strip().split("\n") for line in service_lines: if line.strip(): parts = line.split() if not parts: continue service_name = parts[0] if not service_name.endswith(".service"): continue service_file_result = await conn.run( f"systemctl cat {service_name}" ) if ( service_file_result.exit_status == 0 and service_file_result.stdout ): stdout_text = ( service_file_result.stdout.decode() if isinstance(service_file_result.stdout, bytes) else str(service_file_result.stdout) ) if isinstance(backup_data["services"], dict): backup_data["services"][service_name] = { "type": "systemd", "config": stdout_text, "backed_up": True, } # Backup network configuration network_configs = {} # Network interfaces interfaces_result = await conn.run( 'cat /etc/netplan/*.yaml 2>/dev/null || cat /etc/network/interfaces 2>/dev/null || echo "No network config found"' ) if interfaces_result.exit_status == 0: network_configs["interfaces"] = interfaces_result.stdout # Firewall rules ufw_result = await conn.run( 'sudo ufw status numbered 2>/dev/null || echo "UFW not available"' ) if ufw_result.exit_status == 0: network_configs["firewall"] = ufw_result.stdout # DNS configuration dns_result = await conn.run("cat /etc/resolv.conf") if dns_result.exit_status == 0: network_configs["dns"] = dns_result.stdout backup_data["network_config"] = network_configs # Backup system configuration system_configs = {} # Crontab cron_result = await conn.run('crontab -l 2>/dev/null || echo "No crontab"') if cron_result.exit_status == 0: system_configs["crontab"] = cron_result.stdout # SSH configuration ssh_result = await conn.run("sudo cat /etc/ssh/sshd_config") if ssh_result.exit_status == 0: system_configs["ssh"] = ssh_result.stdout backup_data["system_config"] = system_configs return backup_data except Exception as e: return {"device_id": device_id, "backed_up": False, "error": str(e)} async def _backup_network_topology(manager: InfrastructureManager) -> dict[str, Any]: """Backup network topology.""" return {"topology": "backed_up"} async def _rollback_device( manager: InfrastructureManager, device_id: int, backup_data: dict[str, Any] ) -> dict[str, Any]: """Rollback a single device.""" return {"status": "success", "device_id": device_id}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/washyu/mcp_python_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server