bluegreen.pyโข29 kB
"""
Blue/Green Deployment System for Katamari Components.
Provides parallel deployment capabilities with state synchronization,
heuristic protection, and seamless traffic switching.
"""
import asyncio
import logging
import json
import time
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Any, Optional, List, Tuple, Set
from dataclasses import dataclass, asdict, field
from enum import Enum
import shutil
import tempfile
from .sandbox import CapabilitySandbox, SandboxConfig, SandboxResult
from .heuristics import HeuristicProfile, HeuristicTags, DataExposure, Complexity
from .git_tracker import GitTracker
from ..utils.config import Config
logger = logging.getLogger(__name__)
class DeploymentStatus(Enum):
"""Deployment status enumeration."""
PENDING = "pending"
DEPLOYING = "deploying"
VERIFYING = "verifying"
ACTIVE = "active"
STANDBY = "standby"
DEPRECATED = "deprecated"
FAILED = "failed"
ROLLING_BACK = "rolling_back"
class InstanceType(Enum):
"""Instance type enumeration."""
BLUE = "blue" # Production instance
GREEN = "green" # New instance being deployed
OBSOLETE = "obsolete" # Previous instance
@dataclass
class ComponentVersion:
"""Component version information."""
version: str
git_commit: str
git_branch: str
timestamp: datetime
author: str
message: str
file_hash: str
capabilities: List[str] = field(default_factory=list)
dependencies: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
data = asdict(self)
data['timestamp'] = self.timestamp.isoformat()
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ComponentVersion':
"""Create from dictionary."""
data['timestamp'] = datetime.fromisoformat(data['timestamp'])
return cls(**data)
@dataclass
class DeploymentInstance:
"""Represents a deployed component instance."""
instance_id: str
component_name: str
version: ComponentVersion
instance_type: InstanceType
status: DeploymentStatus
sandbox: CapabilitySandbox
heuristics: HeuristicProfile
created_at: datetime
last_health_check: Optional[datetime] = None
health_score: float = 1.0
traffic_percentage: float = 0.0
state_data: Dict[str, Any] = field(default_factory=dict)
metrics: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
'instance_id': self.instance_id,
'component_name': self.component_name,
'version': self.version.to_dict(),
'instance_type': self.instance_type.value,
'status': self.status.value,
'created_at': self.created_at.isoformat(),
'last_health_check': self.last_health_check.isoformat() if self.last_health_check else None,
'health_score': self.health_score,
'traffic_percentage': self.traffic_percentage,
'state_data': self.state_data,
'metrics': self.metrics,
'heuristics': self.heuristics.to_dict()
}
@dataclass
class StateSyncOperation:
"""State synchronization operation."""
source_instance: str
target_instance: str
sync_type: str # 'full', 'incremental', 'selective'
timestamp: datetime
status: str = "pending"
error: Optional[str] = None
synced_keys: List[str] = field(default_factory=list)
class BlueGreenDeployer:
"""Manages blue/green deployments of components."""
def __init__(self, config: Optional[Config] = None):
self.config = config or Config()
self.git_tracker = GitTracker()
# Deployment storage
self.deployments: Dict[str, DeploymentInstance] = {}
self.component_history: Dict[str, List[ComponentVersion]] = {}
# State management
self.state_store: Dict[str, Dict[str, Any]] = {}
self.sync_operations: List[StateSyncOperation] = []
# Routing configuration
self.traffic_router: Dict[str, Dict[str, float]] = {} # component -> {instance_id: percentage}
# Health monitoring
self.health_monitor_task: Optional[asyncio.Task] = None
self.monitoring_enabled = True
# Deployment directories
self.deployment_root = Path(self.config.get('deployment_root', '/tmp/katamari_deployments'))
self.deployment_root.mkdir(parents=True, exist_ok=True)
logger.info("Blue/Green deployer initialized")
async def deploy_component(self, component_name: str, component_code: str,
heuristics: Optional[HeuristicProfile] = None,
git_commit: Optional[str] = None) -> str:
"""Deploy a new component using blue/green strategy."""
logger.info(f"Starting blue/green deployment for {component_name}")
try:
# Get current git information
if not git_commit:
git_info = await self.git_tracker.get_current_commit_info()
git_commit = git_info['commit']
# Create version information
version = await self._create_component_version(
component_name, component_code, git_commit or 'unknown'
)
# Determine deployment strategy
current_instances = self._get_component_instances(component_name)
if not current_instances:
# First deployment - create blue instance
instance_id = await self._create_initial_deployment(
component_name, component_code, version, heuristics
)
else:
# Blue/green deployment - create green instance
instance_id = await self._create_green_deployment(
component_name, component_code, version, heuristics
)
logger.info(f"Component {component_name} deployed as instance {instance_id}")
return instance_id
except Exception as e:
logger.error(f"Failed to deploy {component_name}: {e}")
raise
async def _create_component_version(self, component_name: str, component_code: str,
git_commit: str) -> ComponentVersion:
"""Create version information for component."""
# Get git commit details
commit_info = await self.git_tracker.get_commit_info(git_commit)
# Calculate file hash
file_hash = hashlib.sha256(component_code.encode()).hexdigest()
# Extract capabilities from code
capabilities = await self._extract_capabilities(component_code)
# Create version
version = ComponentVersion(
version=f"v{int(time.time())}",
git_commit=git_commit,
git_branch=commit_info.get('branch', 'main'),
timestamp=datetime.now(timezone.utc),
author=commit_info.get('author', 'unknown'),
message=commit_info.get('message', ''),
file_hash=file_hash,
capabilities=capabilities
)
# Store in history
if component_name not in self.component_history:
self.component_history[component_name] = []
self.component_history[component_name].append(version)
return version
async def _create_initial_deployment(self, component_name: str, component_code: str,
version: ComponentVersion,
heuristics: Optional[HeuristicProfile]) -> str:
"""Create initial blue deployment."""
instance_id = f"{component_name}-blue-{int(time.time())}"
# Create sandbox
sandbox = CapabilitySandbox()
# Validate and deploy
validation_issues = await sandbox.validate_capability(component_code)
if validation_issues:
raise ValueError(f"Component validation failed: {validation_issues}")
# Set default heuristics if not provided
if not heuristics:
heuristics = HeuristicProfile.default()
# Create deployment instance
instance = DeploymentInstance(
instance_id=instance_id,
component_name=component_name,
version=version,
instance_type=InstanceType.BLUE,
status=DeploymentStatus.ACTIVE,
sandbox=sandbox,
heuristics=heuristics,
created_at=datetime.now(timezone.utc),
traffic_percentage=100.0
)
# Store deployment
self.deployments[instance_id] = instance
# Initialize routing
self.traffic_router[component_name] = {instance_id: 100.0}
# Start health monitoring
if not self.health_monitor_task:
self.health_monitor_task = asyncio.create_task(self._health_monitor_loop())
logger.info(f"Created initial blue deployment: {instance_id}")
return instance_id
async def _create_green_deployment(self, component_name: str, component_code: str,
version: ComponentVersion,
heuristics: Optional[HeuristicProfile]) -> str:
"""Create green deployment for blue/green strategy."""
instance_id = f"{component_name}-green-{int(time.time())}"
# Get current blue instance
blue_instance = self._get_blue_instance(component_name)
if not blue_instance:
raise ValueError(f"No blue instance found for {component_name}")
# Create sandbox for green instance
sandbox = CapabilitySandbox()
# Validate new component
validation_issues = await sandbox.validate_capability(component_code)
if validation_issues:
raise ValueError(f"Green deployment validation failed: {validation_issues}")
# Set heuristics
if not heuristics:
heuristics = HeuristicProfile.default()
# Create green instance
green_instance = DeploymentInstance(
instance_id=instance_id,
component_name=component_name,
version=version,
instance_type=InstanceType.GREEN,
status=DeploymentStatus.DEPLOYING,
sandbox=sandbox,
heuristics=heuristics,
created_at=datetime.now(timezone.utc),
traffic_percentage=0.0
)
# Store deployment
self.deployments[instance_id] = green_instance
# Sync state from blue to green
await self._sync_state_to_green(blue_instance, green_instance)
# Update blue instance status
blue_instance.status = DeploymentStatus.STANDBY
# Start verification
await self._verify_green_deployment(green_instance)
logger.info(f"Created green deployment: {instance_id}")
return instance_id
async def _sync_state_to_green(self, blue_instance: DeploymentInstance,
green_instance: DeploymentInstance) -> None:
"""Synchronize state from blue to green instance."""
logger.info(f"Syncing state from {blue_instance.instance_id} to {green_instance.instance_id}")
sync_op = StateSyncOperation(
source_instance=blue_instance.instance_id,
target_instance=green_instance.instance_id,
sync_type="full",
timestamp=datetime.now(timezone.utc)
)
try:
# Get state from blue instance
blue_state = blue_instance.state_data.copy()
# Filter state based on heuristics and compatibility
compatible_state = await self._filter_compatible_state(
blue_state, blue_instance.heuristics, green_instance.heuristics
)
# Apply state to green instance
green_instance.state_data.update(compatible_state)
# Sync heuristics learning data
await self._sync_heuristics_data(blue_instance.heuristics, green_instance.heuristics)
sync_op.status = "completed"
sync_op.synced_keys = list(compatible_state.keys())
logger.info(f"State sync completed: {len(compatible_state)} keys synced")
except Exception as e:
sync_op.status = "failed"
sync_op.error = str(e)
logger.error(f"State sync failed: {e}")
raise
finally:
self.sync_operations.append(sync_op)
async def _filter_compatible_state(self, state: Dict[str, Any],
source_heuristics: HeuristicProfile,
target_heuristics: HeuristicProfile) -> Dict[str, Any]:
"""Filter state for compatibility between heuristics profiles."""
compatible_state = {}
for key, value in state.items():
# Check if state key is compatible with target heuristics
if await self._is_state_compatible(key, value, target_heuristics):
compatible_state[key] = value
else:
logger.warning(f"State key {key} not compatible with target heuristics")
return compatible_state
async def _is_state_compatible(self, key: str, value: Any,
heuristics: HeuristicProfile) -> bool:
"""Check if state key/value is compatible with heuristics."""
# Check data exposure level
if heuristics.data_exposure == DataExposure.NONE:
# No data exposure allowed - only allow internal state
if key.startswith('external_') or key.startswith('public_'):
return False
# Check complexity level
if heuristics.complexity == Complexity.SIMPLE:
# Simple complexity - limit state size
if isinstance(value, (dict, list)) and len(value) > 100:
return False
# Check resource usage (using complexity as proxy for now)
if heuristics.complexity == Complexity.SIMPLE:
# Simple complexity - limit state memory
state_size = len(str(value))
if state_size > 1024 * 1024: # 1MB
return False
return True
async def _sync_heuristics_data(self, source_heuristics: HeuristicProfile,
target_heuristics: HeuristicProfile) -> None:
"""Synchronize heuristics learning data."""
# Sync performance metrics
if hasattr(source_heuristics, 'performance_history'):
target_heuristics.performance_history = source_heuristics.performance_history.copy()
# Sync learning patterns
if hasattr(source_heuristics, 'learned_patterns'):
target_heuristics.learned_patterns = source_heuristics.learned_patterns.copy()
# Sync feedback data
if hasattr(source_heuristics, 'feedback_history'):
target_heuristics.feedback_history = source_heuristics.feedback_history.copy()
async def _verify_green_deployment(self, green_instance: DeploymentInstance) -> None:
"""Verify green deployment before switching traffic."""
logger.info(f"Verifying green deployment: {green_instance.instance_id}")
green_instance.status = DeploymentStatus.VERIFYING
try:
# Health checks
health_score = await self._perform_health_check(green_instance)
green_instance.health_score = health_score
if health_score < 0.8:
raise ValueError(f"Health check failed: score {health_score}")
# Functional tests
test_results = await self._run_functional_tests(green_instance)
if not test_results['passed']:
raise ValueError(f"Functional tests failed: {test_results['errors']}")
# Performance validation
perf_results = await self._validate_performance(green_instance)
if not perf_results['acceptable']:
raise ValueError(f"Performance validation failed: {perf_results['issues']}")
# Mark as ready for traffic
green_instance.status = DeploymentStatus.ACTIVE
logger.info(f"Green deployment verified: {green_instance.instance_id}")
except Exception as e:
green_instance.status = DeploymentStatus.FAILED
logger.error(f"Green deployment verification failed: {e}")
raise
async def switch_traffic(self, component_name: str, target_instance_id: str,
gradual: bool = True, duration_seconds: int = 300) -> None:
"""Switch traffic to target instance."""
logger.info(f"Switching traffic for {component_name} to {target_instance_id}")
target_instance = self.deployments.get(target_instance_id)
if not target_instance:
raise ValueError(f"Instance {target_instance_id} not found")
if target_instance.status != DeploymentStatus.ACTIVE:
raise ValueError(f"Instance {target_instance_id} not active")
current_routing = self.traffic_router.get(component_name, {})
if gradual:
# Gradual traffic switch
await self._gradual_traffic_switch(
component_name, current_routing, target_instance_id, duration_seconds
)
else:
# Immediate switch
await self._immediate_traffic_switch(
component_name, current_routing, target_instance_id
)
# Update instance types
await self._update_instance_types_after_switch(component_name, target_instance_id)
logger.info(f"Traffic switch completed for {component_name}")
async def _gradual_traffic_switch(self, component_name: str,
current_routing: Dict[str, float],
target_instance_id: str,
duration_seconds: int) -> None:
"""Gradually switch traffic over time."""
steps = 10
step_duration = duration_seconds / steps
for step in range(steps + 1):
percentage = (step / steps) * 100
# Update routing
new_routing = {target_instance_id: percentage}
# Distribute remaining traffic to other instances
remaining_percentage = 100 - percentage
other_instances = [iid for iid in current_routing.keys() if iid != target_instance_id]
if other_instances and remaining_percentage > 0:
per_instance = remaining_percentage / len(other_instances)
for iid in other_instances:
new_routing[iid] = per_instance
self.traffic_router[component_name] = new_routing
# Update instance traffic percentages
for iid, pct in new_routing.items():
if iid in self.deployments:
self.deployments[iid].traffic_percentage = pct
logger.info(f"Traffic step {step}/{steps}: {target_instance_id} at {percentage:.1f}%")
if step < steps:
await asyncio.sleep(step_duration)
async def _immediate_traffic_switch(self, component_name: str,
current_routing: Dict[str, float],
target_instance_id: str) -> None:
"""Immediately switch all traffic to target instance."""
new_routing = {target_instance_id: 100.0}
self.traffic_router[component_name] = new_routing
# Update all instance traffic percentages
for iid in self.deployments:
if self.deployments[iid].component_name == component_name:
self.deployments[iid].traffic_percentage = 100.0 if iid == target_instance_id else 0.0
async def _update_instance_types_after_switch(self, component_name: str,
new_primary_instance_id: str) -> None:
"""Update instance types after traffic switch."""
for instance_id, instance in self.deployments.items():
if instance.component_name == component_name:
if instance_id == new_primary_instance_id:
instance.instance_type = InstanceType.BLUE
elif instance.instance_type == InstanceType.BLUE:
instance.instance_type = InstanceType.OBSOLETE
async def rollback_deployment(self, component_name: str,
target_version: Optional[str] = None) -> str:
"""Rollback to previous version."""
logger.info(f"Rolling back deployment for {component_name}")
# Get current instances
current_instances = self._get_component_instances(component_name)
if not current_instances:
raise ValueError(f"No instances found for {component_name}")
# Find target version for rollback
if target_version:
rollback_version = None
for version in self.component_history.get(component_name, []):
if version.version == target_version:
rollback_version = version
break
if not rollback_version:
raise ValueError(f"Version {target_version} not found in history")
else:
# Rollback to previous version
versions = self.component_history.get(component_name, [])
if len(versions) < 2:
raise ValueError("No previous version available for rollback")
rollback_version = versions[-2]
# Get the code for rollback version
rollback_code = await self._get_version_code(rollback_version)
# Create rollback instance
rollback_instance_id = await self.deploy_component(
component_name, rollback_code, None, rollback_version.git_commit
)
# Switch traffic immediately to rollback instance
await self.switch_traffic(component_name, rollback_instance_id, gradual=False)
logger.info(f"Rollback completed: {rollback_instance_id}")
return rollback_instance_id
async def _get_version_code(self, version: ComponentVersion) -> str:
"""Get component code for specific version."""
# Retrieve from git
return await self.git_tracker.get_file_content_at_commit(
version.git_commit, f"capabilities/{version.version}.py"
)
def _get_component_instances(self, component_name: str) -> List[DeploymentInstance]:
"""Get all instances for a component."""
return [
instance for instance in self.deployments.values()
if instance.component_name == component_name
]
def _get_blue_instance(self, component_name: str) -> Optional[DeploymentInstance]:
"""Get the blue (production) instance for a component."""
for instance in self.deployments.values():
if (instance.component_name == component_name and
instance.instance_type == InstanceType.BLUE):
return instance
return None
async def _extract_capabilities(self, component_code: str) -> List[str]:
"""Extract capability names from component code."""
capabilities = []
# Simple extraction - look for function definitions
import re
func_matches = re.findall(r'^def\s+(\w+)\s*\(', component_code, re.MULTILINE)
capabilities.extend(func_matches)
# Look for class definitions
class_matches = re.findall(r'^class\s+(\w+)\s*:', component_code, re.MULTILINE)
capabilities.extend(class_matches)
return list(set(capabilities))
async def _perform_health_check(self, instance: DeploymentInstance) -> float:
"""Perform health check on instance."""
try:
# Basic health check - try to execute a simple test
test_code = '''
def health_check():
return {"status": "healthy", "timestamp": "2024-01-01T00:00:00Z"}
result = health_check()
'''
result = await instance.sandbox.execute_capability(test_code, "health_check")
if result.success:
return 1.0
else:
return 0.5
except Exception as e:
logger.warning(f"Health check failed for {instance.instance_id}: {e}")
return 0.0
async def _run_functional_tests(self, instance: DeploymentInstance) -> Dict[str, Any]:
"""Run functional tests on instance."""
# This would run component-specific tests
# For now, return a basic pass
return {"passed": True, "errors": []}
async def _validate_performance(self, instance: DeploymentInstance) -> Dict[str, Any]:
"""Validate performance of instance."""
# This would run performance benchmarks
# For now, return basic acceptance
return {"acceptable": True, "issues": []}
async def _health_monitor_loop(self) -> None:
"""Background health monitoring loop."""
while self.monitoring_enabled:
try:
for instance in self.deployments.values():
if instance.status in [DeploymentStatus.ACTIVE, DeploymentStatus.STANDBY]:
health_score = await self._perform_health_check(instance)
instance.health_score = health_score
instance.last_health_check = datetime.now(timezone.utc)
# Update metrics
instance.metrics['last_health_score'] = health_score
await asyncio.sleep(30) # Check every 30 seconds
except Exception as e:
logger.error(f"Health monitor error: {e}")
await asyncio.sleep(60)
async def get_deployment_status(self, component_name: str) -> Dict[str, Any]:
"""Get deployment status for component."""
instances = self._get_component_instances(component_name)
routing = self.traffic_router.get(component_name, {})
return {
'component_name': component_name,
'instances': [instance.to_dict() for instance in instances],
'traffic_routing': routing,
'total_instances': len(instances),
'active_instances': len([i for i in instances if i.status == DeploymentStatus.ACTIVE])
}
async def cleanup_deprecated_instances(self, max_age_hours: int = 24) -> int:
"""Clean up deprecated instances older than max_age_hours."""
cutoff_time = datetime.now(timezone.utc).timestamp() - (max_age_hours * 3600)
deprecated_instances = []
for instance_id, instance in self.deployments.items():
if (instance.instance_type == InstanceType.OBSOLETE and
instance.created_at.timestamp() < cutoff_time):
deprecated_instances.append(instance_id)
# Remove deprecated instances
for instance_id in deprecated_instances:
del self.deployments[instance_id]
logger.info(f"Cleaned up deprecated instance: {instance_id}")
return len(deprecated_instances)
async def shutdown(self) -> None:
"""Shutdown the blue/green deployer."""
self.monitoring_enabled = False
if self.health_monitor_task:
self.health_monitor_task.cancel()
try:
await self.health_monitor_task
except asyncio.CancelledError:
pass
# Cleanup sandboxes
for instance in self.deployments.values():
try:
await instance.sandbox.sandbox._cleanup_temp_directory(
instance.sandbox.sandbox._temp_dir
)
except:
pass
logger.info("Blue/Green deployer shutdown completed")