Skip to main content
Glama
risk_assessment.py12.8 kB
""" Stack Risk Assessment Module Risk analysis and mitigation recommendations for Docker stack migrations. Evaluates data size, downtime, critical files, and service complexity. """ import structlog from ...utils import format_size class StackRiskAssessment: """Risk assessment and mitigation planning for stack migrations.""" def __init__(self): self.logger = structlog.get_logger() def assess_migration_risks( self, stack_name: str, data_size_bytes: int, estimated_downtime: float, source_inventory: dict = None, compose_content: str = "", ) -> dict: """Assess risks associated with the migration. Args: stack_name: Name of the stack being migrated data_size_bytes: Size of data to migrate estimated_downtime: Estimated downtime in seconds source_inventory: Source data inventory from migration manager compose_content: Docker Compose file content Returns: Dict with risk assessment details """ risks = { "overall_risk": "LOW", "risk_factors": [], "warnings": [], "recommendations": [], "critical_files": [], "rollback_plan": [], } # Assess each risk factor self._assess_data_size_risk(risks, data_size_bytes) self._assess_downtime_risk(risks, estimated_downtime) self._assess_critical_files_risk(risks, source_inventory) self._assess_compose_complexity_risk(risks, compose_content) # Generate rollback plan and additional recommendations self._generate_rollback_plan(risks) self._add_risk_based_recommendations(risks) return risks def _assess_data_size_risk(self, risks: dict, data_size_bytes: int) -> None: """Assess risk based on data size.""" if data_size_bytes > 50 * 1024**3: # > 50GB risks["risk_factors"].append("LARGE_DATASET") risks["warnings"].append( f"Large dataset ({format_size(data_size_bytes)}) - increased transfer time and failure risk" ) risks["recommendations"].append("Consider migrating during maintenance window") risks["overall_risk"] = "HIGH" elif data_size_bytes > 10 * 1024**3: # > 10GB risks["risk_factors"].append("MODERATE_DATASET") risks["warnings"].append( f"Moderate dataset ({format_size(data_size_bytes)}) - plan for extended transfer time" ) risks["overall_risk"] = "MEDIUM" def _assess_downtime_risk(self, risks: dict, estimated_downtime: float) -> None: """Assess risk based on estimated downtime.""" if estimated_downtime > 3600: # > 1 hour risks["risk_factors"].append("LONG_DOWNTIME") risks["warnings"].append( f"Extended downtime expected ({self._format_time(estimated_downtime)})" ) risks["recommendations"].append("Schedule migration during low-usage period") if risks["overall_risk"] == "LOW": risks["overall_risk"] = "MEDIUM" elif estimated_downtime > 600: # > 10 minutes risks["risk_factors"].append("MODERATE_DOWNTIME") risks["warnings"].append( f"Moderate downtime expected ({self._format_time(estimated_downtime)})" ) def _assess_critical_files_risk(self, risks: dict, source_inventory: dict) -> None: """Assess risk based on critical files in source inventory.""" if not source_inventory or not source_inventory.get("critical_files"): return critical_files = source_inventory["critical_files"] # Identify database files db_files = [ f for f in critical_files.keys() if any(ext in f.lower() for ext in [".db", ".sql", ".sqlite", "database"]) ] # Identify configuration files config_files = [ f for f in critical_files.keys() if any(ext in f.lower() for ext in [".conf", ".config", ".env", ".yaml", ".json"]) ] if db_files: risks["risk_factors"].append("DATABASE_FILES") risks["warnings"].append( f"Database files detected ({len(db_files)} files) - data corruption risk if not properly stopped" ) risks["recommendations"].append( "Ensure all database connections are closed before migration" ) risks["critical_files"].extend(db_files) if risks["overall_risk"] == "LOW": risks["overall_risk"] = "MEDIUM" if config_files: risks["critical_files"].extend(config_files) if len(critical_files) > 20: risks["risk_factors"].append("MANY_CRITICAL_FILES") risks["warnings"].append( f"Many critical files ({len(critical_files)}) - increased complexity" ) def _assess_compose_complexity_risk(self, risks: dict, compose_content: str) -> None: """Assess risk based on Docker Compose file complexity.""" if not compose_content: return try: import yaml compose_data = yaml.safe_load(compose_content) services = compose_data.get("services", {}) # Assess different aspects of compose complexity self._assess_persistent_volume_risk(risks, services) self._assess_health_check_complexity(risks, services) except Exception as e: # Skip compose analysis if parsing fails self.logger.debug("Failed to analyze compose content for risks", error=str(e)) def _assess_persistent_volume_risk(self, risks: dict, services: dict) -> None: """Assess risk from services with persistent volumes.""" persistent_services = [] for service_name, service_config in services.items(): volumes = service_config.get("volumes", []) if volumes: persistent_services.append(service_name) if persistent_services: risks["risk_factors"].append("PERSISTENT_SERVICES") if len(persistent_services) > 3: risks["warnings"].append( f"Multiple services with persistent data ({len(persistent_services)} services)" ) if risks["overall_risk"] == "LOW": risks["overall_risk"] = "MEDIUM" def _assess_health_check_complexity(self, risks: dict, services: dict) -> None: """Assess complexity from services with health checks.""" health_checked_services = [] for service_name, service_config in services.items(): if "healthcheck" in service_config: health_checked_services.append(service_name) if health_checked_services: risks["recommendations"].append( "Monitor health checks after migration - services may need time to stabilize" ) def _generate_rollback_plan(self, risks: dict) -> None: """Generate a rollback plan for the migration.""" risks["rollback_plan"] = [ "1. Stop target stack immediately if issues detected", "2. Verify source stack can be restarted on original host", "3. Restore from backup if target data was corrupted", "4. Update DNS/load balancer to point back to source", "5. Monitor source services for stability after rollback", ] def _add_risk_based_recommendations(self, risks: dict) -> None: """Add recommendations based on overall risk level.""" if risks["overall_risk"] == "HIGH": risks["recommendations"].extend( [ "Create full backup before starting migration", "Test rollback procedure in non-production environment", "Have technical team available during migration", "Consider incremental migration approach for large datasets", ] ) elif risks["overall_risk"] == "MEDIUM": risks["recommendations"].extend( [ "Create backup before starting migration", "Monitor migration progress closely", "Prepare rollback steps in advance", ] ) def _format_time(self, seconds: float) -> str: """Format seconds into human-readable time string.""" if seconds < 60: return f"{seconds:.1f}s" elif seconds < 3600: minutes = seconds / 60 return f"{minutes:.1f}m" elif seconds < 86400: hours = seconds / 3600 return f"{hours:.1f}h" else: days = seconds / 86400 return f"{days:.1f}d" def calculate_risk_score(self, risks: dict) -> int: """Calculate a numerical risk score (0-100). Args: risks: Risk assessment dictionary Returns: Risk score from 0 (lowest risk) to 100 (highest risk) """ score = 0 risk_factors = risks.get("risk_factors", []) # Base score based on risk factors factor_scores = { "LARGE_DATASET": 30, "MODERATE_DATASET": 15, "LONG_DOWNTIME": 25, "MODERATE_DOWNTIME": 10, "DATABASE_FILES": 20, "MANY_CRITICAL_FILES": 10, "PERSISTENT_SERVICES": 10, } for factor in risk_factors: score += factor_scores.get(factor, 5) # Cap at 100 return min(score, 100) def generate_mitigation_plan(self, risks: dict) -> dict: """Generate specific mitigation strategies for identified risks. Args: risks: Risk assessment dictionary Returns: Dict with mitigation strategies per risk factor """ mitigation_plan = { "pre_migration": [], "during_migration": [], "post_migration": [], "contingency": [], } risk_factors = risks.get("risk_factors", []) for factor in risk_factors: if factor == "LARGE_DATASET": mitigation_plan["pre_migration"].extend( [ "Schedule during off-peak hours", "Verify network bandwidth between hosts", "Create incremental backup strategy", ] ) mitigation_plan["during_migration"].extend( [ "Monitor transfer progress every 30 minutes", "Have fallback communication plan ready", ] ) elif factor == "DATABASE_FILES": mitigation_plan["pre_migration"].extend( [ "Create database dump/export", "Verify all connections are closed", "Test database startup on target", ] ) mitigation_plan["post_migration"].extend( ["Verify database integrity after migration", "Run database consistency checks"] ) return mitigation_plan def assess_rollback_feasibility(self, risks: dict, migration_params: dict) -> dict: """Assess how feasible a rollback would be. Args: risks: Risk assessment dictionary migration_params: Migration parameters (remove_source, etc.) Returns: Dict with rollback feasibility assessment """ feasibility = { "rollback_possible": True, "rollback_time_estimate": "5-15 minutes", "rollback_risks": [], "preparation_required": [], } # If source will be removed, rollback is more complex if migration_params.get("remove_source", False): feasibility["rollback_possible"] = False feasibility["rollback_risks"].append( "Source stack will be deleted - rollback requires restore from backup" ) feasibility["preparation_required"].append( "Ensure backup is created and tested before migration" ) # Large datasets make rollback slower if "LARGE_DATASET" in risks.get("risk_factors", []): feasibility["rollback_time_estimate"] = "30-60 minutes" feasibility["rollback_risks"].append("Large dataset restore will take significant time") return feasibility

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/docker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server