Skip to main content
Glama

Katamari MCP Server

by ciphernaut
bluegreen.pyโ€ข29 kB
""" Blue/Green Deployment System for Katamari Components. Provides parallel deployment capabilities with state synchronization, heuristic protection, and seamless traffic switching. """ import asyncio import logging import json import time import hashlib from datetime import datetime, timezone from pathlib import Path from typing import Dict, Any, Optional, List, Tuple, Set from dataclasses import dataclass, asdict, field from enum import Enum import shutil import tempfile from .sandbox import CapabilitySandbox, SandboxConfig, SandboxResult from .heuristics import HeuristicProfile, HeuristicTags, DataExposure, Complexity from .git_tracker import GitTracker from ..utils.config import Config logger = logging.getLogger(__name__) class DeploymentStatus(Enum): """Deployment status enumeration.""" PENDING = "pending" DEPLOYING = "deploying" VERIFYING = "verifying" ACTIVE = "active" STANDBY = "standby" DEPRECATED = "deprecated" FAILED = "failed" ROLLING_BACK = "rolling_back" class InstanceType(Enum): """Instance type enumeration.""" BLUE = "blue" # Production instance GREEN = "green" # New instance being deployed OBSOLETE = "obsolete" # Previous instance @dataclass class ComponentVersion: """Component version information.""" version: str git_commit: str git_branch: str timestamp: datetime author: str message: str file_hash: str capabilities: List[str] = field(default_factory=list) dependencies: List[str] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" data = asdict(self) data['timestamp'] = self.timestamp.isoformat() return data @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'ComponentVersion': """Create from dictionary.""" data['timestamp'] = datetime.fromisoformat(data['timestamp']) return cls(**data) @dataclass class DeploymentInstance: """Represents a deployed component instance.""" instance_id: str component_name: str version: ComponentVersion instance_type: InstanceType status: DeploymentStatus sandbox: CapabilitySandbox heuristics: HeuristicProfile created_at: datetime last_health_check: Optional[datetime] = None health_score: float = 1.0 traffic_percentage: float = 0.0 state_data: Dict[str, Any] = field(default_factory=dict) metrics: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return { 'instance_id': self.instance_id, 'component_name': self.component_name, 'version': self.version.to_dict(), 'instance_type': self.instance_type.value, 'status': self.status.value, 'created_at': self.created_at.isoformat(), 'last_health_check': self.last_health_check.isoformat() if self.last_health_check else None, 'health_score': self.health_score, 'traffic_percentage': self.traffic_percentage, 'state_data': self.state_data, 'metrics': self.metrics, 'heuristics': self.heuristics.to_dict() } @dataclass class StateSyncOperation: """State synchronization operation.""" source_instance: str target_instance: str sync_type: str # 'full', 'incremental', 'selective' timestamp: datetime status: str = "pending" error: Optional[str] = None synced_keys: List[str] = field(default_factory=list) class BlueGreenDeployer: """Manages blue/green deployments of components.""" def __init__(self, config: Optional[Config] = None): self.config = config or Config() self.git_tracker = GitTracker() # Deployment storage self.deployments: Dict[str, DeploymentInstance] = {} self.component_history: Dict[str, List[ComponentVersion]] = {} # State management self.state_store: Dict[str, Dict[str, Any]] = {} self.sync_operations: List[StateSyncOperation] = [] # Routing configuration self.traffic_router: Dict[str, Dict[str, float]] = {} # component -> {instance_id: percentage} # Health monitoring self.health_monitor_task: Optional[asyncio.Task] = None self.monitoring_enabled = True # Deployment directories self.deployment_root = Path(self.config.get('deployment_root', '/tmp/katamari_deployments')) self.deployment_root.mkdir(parents=True, exist_ok=True) logger.info("Blue/Green deployer initialized") async def deploy_component(self, component_name: str, component_code: str, heuristics: Optional[HeuristicProfile] = None, git_commit: Optional[str] = None) -> str: """Deploy a new component using blue/green strategy.""" logger.info(f"Starting blue/green deployment for {component_name}") try: # Get current git information if not git_commit: git_info = await self.git_tracker.get_current_commit_info() git_commit = git_info['commit'] # Create version information version = await self._create_component_version( component_name, component_code, git_commit or 'unknown' ) # Determine deployment strategy current_instances = self._get_component_instances(component_name) if not current_instances: # First deployment - create blue instance instance_id = await self._create_initial_deployment( component_name, component_code, version, heuristics ) else: # Blue/green deployment - create green instance instance_id = await self._create_green_deployment( component_name, component_code, version, heuristics ) logger.info(f"Component {component_name} deployed as instance {instance_id}") return instance_id except Exception as e: logger.error(f"Failed to deploy {component_name}: {e}") raise async def _create_component_version(self, component_name: str, component_code: str, git_commit: str) -> ComponentVersion: """Create version information for component.""" # Get git commit details commit_info = await self.git_tracker.get_commit_info(git_commit) # Calculate file hash file_hash = hashlib.sha256(component_code.encode()).hexdigest() # Extract capabilities from code capabilities = await self._extract_capabilities(component_code) # Create version version = ComponentVersion( version=f"v{int(time.time())}", git_commit=git_commit, git_branch=commit_info.get('branch', 'main'), timestamp=datetime.now(timezone.utc), author=commit_info.get('author', 'unknown'), message=commit_info.get('message', ''), file_hash=file_hash, capabilities=capabilities ) # Store in history if component_name not in self.component_history: self.component_history[component_name] = [] self.component_history[component_name].append(version) return version async def _create_initial_deployment(self, component_name: str, component_code: str, version: ComponentVersion, heuristics: Optional[HeuristicProfile]) -> str: """Create initial blue deployment.""" instance_id = f"{component_name}-blue-{int(time.time())}" # Create sandbox sandbox = CapabilitySandbox() # Validate and deploy validation_issues = await sandbox.validate_capability(component_code) if validation_issues: raise ValueError(f"Component validation failed: {validation_issues}") # Set default heuristics if not provided if not heuristics: heuristics = HeuristicProfile.default() # Create deployment instance instance = DeploymentInstance( instance_id=instance_id, component_name=component_name, version=version, instance_type=InstanceType.BLUE, status=DeploymentStatus.ACTIVE, sandbox=sandbox, heuristics=heuristics, created_at=datetime.now(timezone.utc), traffic_percentage=100.0 ) # Store deployment self.deployments[instance_id] = instance # Initialize routing self.traffic_router[component_name] = {instance_id: 100.0} # Start health monitoring if not self.health_monitor_task: self.health_monitor_task = asyncio.create_task(self._health_monitor_loop()) logger.info(f"Created initial blue deployment: {instance_id}") return instance_id async def _create_green_deployment(self, component_name: str, component_code: str, version: ComponentVersion, heuristics: Optional[HeuristicProfile]) -> str: """Create green deployment for blue/green strategy.""" instance_id = f"{component_name}-green-{int(time.time())}" # Get current blue instance blue_instance = self._get_blue_instance(component_name) if not blue_instance: raise ValueError(f"No blue instance found for {component_name}") # Create sandbox for green instance sandbox = CapabilitySandbox() # Validate new component validation_issues = await sandbox.validate_capability(component_code) if validation_issues: raise ValueError(f"Green deployment validation failed: {validation_issues}") # Set heuristics if not heuristics: heuristics = HeuristicProfile.default() # Create green instance green_instance = DeploymentInstance( instance_id=instance_id, component_name=component_name, version=version, instance_type=InstanceType.GREEN, status=DeploymentStatus.DEPLOYING, sandbox=sandbox, heuristics=heuristics, created_at=datetime.now(timezone.utc), traffic_percentage=0.0 ) # Store deployment self.deployments[instance_id] = green_instance # Sync state from blue to green await self._sync_state_to_green(blue_instance, green_instance) # Update blue instance status blue_instance.status = DeploymentStatus.STANDBY # Start verification await self._verify_green_deployment(green_instance) logger.info(f"Created green deployment: {instance_id}") return instance_id async def _sync_state_to_green(self, blue_instance: DeploymentInstance, green_instance: DeploymentInstance) -> None: """Synchronize state from blue to green instance.""" logger.info(f"Syncing state from {blue_instance.instance_id} to {green_instance.instance_id}") sync_op = StateSyncOperation( source_instance=blue_instance.instance_id, target_instance=green_instance.instance_id, sync_type="full", timestamp=datetime.now(timezone.utc) ) try: # Get state from blue instance blue_state = blue_instance.state_data.copy() # Filter state based on heuristics and compatibility compatible_state = await self._filter_compatible_state( blue_state, blue_instance.heuristics, green_instance.heuristics ) # Apply state to green instance green_instance.state_data.update(compatible_state) # Sync heuristics learning data await self._sync_heuristics_data(blue_instance.heuristics, green_instance.heuristics) sync_op.status = "completed" sync_op.synced_keys = list(compatible_state.keys()) logger.info(f"State sync completed: {len(compatible_state)} keys synced") except Exception as e: sync_op.status = "failed" sync_op.error = str(e) logger.error(f"State sync failed: {e}") raise finally: self.sync_operations.append(sync_op) async def _filter_compatible_state(self, state: Dict[str, Any], source_heuristics: HeuristicProfile, target_heuristics: HeuristicProfile) -> Dict[str, Any]: """Filter state for compatibility between heuristics profiles.""" compatible_state = {} for key, value in state.items(): # Check if state key is compatible with target heuristics if await self._is_state_compatible(key, value, target_heuristics): compatible_state[key] = value else: logger.warning(f"State key {key} not compatible with target heuristics") return compatible_state async def _is_state_compatible(self, key: str, value: Any, heuristics: HeuristicProfile) -> bool: """Check if state key/value is compatible with heuristics.""" # Check data exposure level if heuristics.data_exposure == DataExposure.NONE: # No data exposure allowed - only allow internal state if key.startswith('external_') or key.startswith('public_'): return False # Check complexity level if heuristics.complexity == Complexity.SIMPLE: # Simple complexity - limit state size if isinstance(value, (dict, list)) and len(value) > 100: return False # Check resource usage (using complexity as proxy for now) if heuristics.complexity == Complexity.SIMPLE: # Simple complexity - limit state memory state_size = len(str(value)) if state_size > 1024 * 1024: # 1MB return False return True async def _sync_heuristics_data(self, source_heuristics: HeuristicProfile, target_heuristics: HeuristicProfile) -> None: """Synchronize heuristics learning data.""" # Sync performance metrics if hasattr(source_heuristics, 'performance_history'): target_heuristics.performance_history = source_heuristics.performance_history.copy() # Sync learning patterns if hasattr(source_heuristics, 'learned_patterns'): target_heuristics.learned_patterns = source_heuristics.learned_patterns.copy() # Sync feedback data if hasattr(source_heuristics, 'feedback_history'): target_heuristics.feedback_history = source_heuristics.feedback_history.copy() async def _verify_green_deployment(self, green_instance: DeploymentInstance) -> None: """Verify green deployment before switching traffic.""" logger.info(f"Verifying green deployment: {green_instance.instance_id}") green_instance.status = DeploymentStatus.VERIFYING try: # Health checks health_score = await self._perform_health_check(green_instance) green_instance.health_score = health_score if health_score < 0.8: raise ValueError(f"Health check failed: score {health_score}") # Functional tests test_results = await self._run_functional_tests(green_instance) if not test_results['passed']: raise ValueError(f"Functional tests failed: {test_results['errors']}") # Performance validation perf_results = await self._validate_performance(green_instance) if not perf_results['acceptable']: raise ValueError(f"Performance validation failed: {perf_results['issues']}") # Mark as ready for traffic green_instance.status = DeploymentStatus.ACTIVE logger.info(f"Green deployment verified: {green_instance.instance_id}") except Exception as e: green_instance.status = DeploymentStatus.FAILED logger.error(f"Green deployment verification failed: {e}") raise async def switch_traffic(self, component_name: str, target_instance_id: str, gradual: bool = True, duration_seconds: int = 300) -> None: """Switch traffic to target instance.""" logger.info(f"Switching traffic for {component_name} to {target_instance_id}") target_instance = self.deployments.get(target_instance_id) if not target_instance: raise ValueError(f"Instance {target_instance_id} not found") if target_instance.status != DeploymentStatus.ACTIVE: raise ValueError(f"Instance {target_instance_id} not active") current_routing = self.traffic_router.get(component_name, {}) if gradual: # Gradual traffic switch await self._gradual_traffic_switch( component_name, current_routing, target_instance_id, duration_seconds ) else: # Immediate switch await self._immediate_traffic_switch( component_name, current_routing, target_instance_id ) # Update instance types await self._update_instance_types_after_switch(component_name, target_instance_id) logger.info(f"Traffic switch completed for {component_name}") async def _gradual_traffic_switch(self, component_name: str, current_routing: Dict[str, float], target_instance_id: str, duration_seconds: int) -> None: """Gradually switch traffic over time.""" steps = 10 step_duration = duration_seconds / steps for step in range(steps + 1): percentage = (step / steps) * 100 # Update routing new_routing = {target_instance_id: percentage} # Distribute remaining traffic to other instances remaining_percentage = 100 - percentage other_instances = [iid for iid in current_routing.keys() if iid != target_instance_id] if other_instances and remaining_percentage > 0: per_instance = remaining_percentage / len(other_instances) for iid in other_instances: new_routing[iid] = per_instance self.traffic_router[component_name] = new_routing # Update instance traffic percentages for iid, pct in new_routing.items(): if iid in self.deployments: self.deployments[iid].traffic_percentage = pct logger.info(f"Traffic step {step}/{steps}: {target_instance_id} at {percentage:.1f}%") if step < steps: await asyncio.sleep(step_duration) async def _immediate_traffic_switch(self, component_name: str, current_routing: Dict[str, float], target_instance_id: str) -> None: """Immediately switch all traffic to target instance.""" new_routing = {target_instance_id: 100.0} self.traffic_router[component_name] = new_routing # Update all instance traffic percentages for iid in self.deployments: if self.deployments[iid].component_name == component_name: self.deployments[iid].traffic_percentage = 100.0 if iid == target_instance_id else 0.0 async def _update_instance_types_after_switch(self, component_name: str, new_primary_instance_id: str) -> None: """Update instance types after traffic switch.""" for instance_id, instance in self.deployments.items(): if instance.component_name == component_name: if instance_id == new_primary_instance_id: instance.instance_type = InstanceType.BLUE elif instance.instance_type == InstanceType.BLUE: instance.instance_type = InstanceType.OBSOLETE async def rollback_deployment(self, component_name: str, target_version: Optional[str] = None) -> str: """Rollback to previous version.""" logger.info(f"Rolling back deployment for {component_name}") # Get current instances current_instances = self._get_component_instances(component_name) if not current_instances: raise ValueError(f"No instances found for {component_name}") # Find target version for rollback if target_version: rollback_version = None for version in self.component_history.get(component_name, []): if version.version == target_version: rollback_version = version break if not rollback_version: raise ValueError(f"Version {target_version} not found in history") else: # Rollback to previous version versions = self.component_history.get(component_name, []) if len(versions) < 2: raise ValueError("No previous version available for rollback") rollback_version = versions[-2] # Get the code for rollback version rollback_code = await self._get_version_code(rollback_version) # Create rollback instance rollback_instance_id = await self.deploy_component( component_name, rollback_code, None, rollback_version.git_commit ) # Switch traffic immediately to rollback instance await self.switch_traffic(component_name, rollback_instance_id, gradual=False) logger.info(f"Rollback completed: {rollback_instance_id}") return rollback_instance_id async def _get_version_code(self, version: ComponentVersion) -> str: """Get component code for specific version.""" # Retrieve from git return await self.git_tracker.get_file_content_at_commit( version.git_commit, f"capabilities/{version.version}.py" ) def _get_component_instances(self, component_name: str) -> List[DeploymentInstance]: """Get all instances for a component.""" return [ instance for instance in self.deployments.values() if instance.component_name == component_name ] def _get_blue_instance(self, component_name: str) -> Optional[DeploymentInstance]: """Get the blue (production) instance for a component.""" for instance in self.deployments.values(): if (instance.component_name == component_name and instance.instance_type == InstanceType.BLUE): return instance return None async def _extract_capabilities(self, component_code: str) -> List[str]: """Extract capability names from component code.""" capabilities = [] # Simple extraction - look for function definitions import re func_matches = re.findall(r'^def\s+(\w+)\s*\(', component_code, re.MULTILINE) capabilities.extend(func_matches) # Look for class definitions class_matches = re.findall(r'^class\s+(\w+)\s*:', component_code, re.MULTILINE) capabilities.extend(class_matches) return list(set(capabilities)) async def _perform_health_check(self, instance: DeploymentInstance) -> float: """Perform health check on instance.""" try: # Basic health check - try to execute a simple test test_code = ''' def health_check(): return {"status": "healthy", "timestamp": "2024-01-01T00:00:00Z"} result = health_check() ''' result = await instance.sandbox.execute_capability(test_code, "health_check") if result.success: return 1.0 else: return 0.5 except Exception as e: logger.warning(f"Health check failed for {instance.instance_id}: {e}") return 0.0 async def _run_functional_tests(self, instance: DeploymentInstance) -> Dict[str, Any]: """Run functional tests on instance.""" # This would run component-specific tests # For now, return a basic pass return {"passed": True, "errors": []} async def _validate_performance(self, instance: DeploymentInstance) -> Dict[str, Any]: """Validate performance of instance.""" # This would run performance benchmarks # For now, return basic acceptance return {"acceptable": True, "issues": []} async def _health_monitor_loop(self) -> None: """Background health monitoring loop.""" while self.monitoring_enabled: try: for instance in self.deployments.values(): if instance.status in [DeploymentStatus.ACTIVE, DeploymentStatus.STANDBY]: health_score = await self._perform_health_check(instance) instance.health_score = health_score instance.last_health_check = datetime.now(timezone.utc) # Update metrics instance.metrics['last_health_score'] = health_score await asyncio.sleep(30) # Check every 30 seconds except Exception as e: logger.error(f"Health monitor error: {e}") await asyncio.sleep(60) async def get_deployment_status(self, component_name: str) -> Dict[str, Any]: """Get deployment status for component.""" instances = self._get_component_instances(component_name) routing = self.traffic_router.get(component_name, {}) return { 'component_name': component_name, 'instances': [instance.to_dict() for instance in instances], 'traffic_routing': routing, 'total_instances': len(instances), 'active_instances': len([i for i in instances if i.status == DeploymentStatus.ACTIVE]) } async def cleanup_deprecated_instances(self, max_age_hours: int = 24) -> int: """Clean up deprecated instances older than max_age_hours.""" cutoff_time = datetime.now(timezone.utc).timestamp() - (max_age_hours * 3600) deprecated_instances = [] for instance_id, instance in self.deployments.items(): if (instance.instance_type == InstanceType.OBSOLETE and instance.created_at.timestamp() < cutoff_time): deprecated_instances.append(instance_id) # Remove deprecated instances for instance_id in deprecated_instances: del self.deployments[instance_id] logger.info(f"Cleaned up deprecated instance: {instance_id}") return len(deprecated_instances) async def shutdown(self) -> None: """Shutdown the blue/green deployer.""" self.monitoring_enabled = False if self.health_monitor_task: self.health_monitor_task.cancel() try: await self.health_monitor_task except asyncio.CancelledError: pass # Cleanup sandboxes for instance in self.deployments.values(): try: await instance.sandbox.sandbox._cleanup_temp_directory( instance.sandbox.sandbox._temp_dir ) except: pass logger.info("Blue/Green deployer shutdown completed")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server