Skip to main content
Glama
process_manager.py•21.5 kB
""" Process Manager for Tiger API Process Pool. Coordinates the process pool, manages account-to-process mapping, monitors process health, and handles resource management. """ import asyncio import uuid from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from typing import Any, Dict, List, Optional from loguru import logger from shared.account_manager import ( AccountStatus, get_account_manager, ) # Import components from .tiger_process_pool import ( ProcessInfo, ProcessStatus, get_process_pool, ) class LoadBalanceStrategy(Enum): """Load balancing strategy for process allocation.""" ROUND_ROBIN = "round_robin" LEAST_LOADED = "least_loaded" RANDOM = "random" STICKY = "sticky" # Always use same process for same account @dataclass class ProcessMetrics: """Metrics for process performance monitoring.""" process_id: str account_id: str total_tasks: int = 0 successful_tasks: int = 0 failed_tasks: int = 0 average_response_time: float = 0.0 last_error: Optional[str] = None last_error_time: Optional[datetime] = None uptime_seconds: float = 0.0 memory_usage_mb: Optional[float] = None cpu_usage_percent: Optional[float] = None @property def success_rate(self) -> float: """Calculate success rate percentage.""" if self.total_tasks == 0: return 100.0 return (self.successful_tasks / self.total_tasks) * 100.0 @property def error_rate(self) -> float: """Calculate error rate percentage.""" if self.total_tasks == 0: return 0.0 return (self.failed_tasks / self.total_tasks) * 100.0 class ProcessManager: """ Process manager for coordinating Tiger API process pool. Provides high-level interface for managing worker processes, load balancing, health monitoring, and resource management. """ def __init__( self, load_balance_strategy: LoadBalanceStrategy = LoadBalanceStrategy.STICKY, max_concurrent_tasks: int = 10, health_check_interval: float = 30.0, metrics_retention_hours: int = 24, auto_scale_enabled: bool = True, min_processes: int = 1, max_processes_per_account: int = 1, ): """ Initialize process manager. Args: load_balance_strategy: Strategy for load balancing max_concurrent_tasks: Maximum concurrent tasks per process health_check_interval: Health check interval in seconds metrics_retention_hours: How long to retain metrics auto_scale_enabled: Enable automatic scaling min_processes: Minimum number of processes to maintain max_processes_per_account: Maximum processes per account """ self.load_balance_strategy = load_balance_strategy self.max_concurrent_tasks = max_concurrent_tasks self.health_check_interval = health_check_interval self.metrics_retention_hours = metrics_retention_hours self.auto_scale_enabled = auto_scale_enabled self.min_processes = min_processes self.max_processes_per_account = max_processes_per_account # Components self.process_pool = get_process_pool() self.account_manager = get_account_manager() # State tracking self.process_metrics: Dict[str, ProcessMetrics] = {} self.task_history: List[Dict[str, Any]] = [] self.round_robin_index = 0 # Background tasks self._health_monitor_task: Optional[asyncio.Task] = None self._metrics_cleanup_task: Optional[asyncio.Task] = None self._running = False logger.info( f"ProcessManager initialized with strategy={load_balance_strategy.value}" ) async def start(self) -> None: """Start the process manager and background tasks.""" try: logger.info("Starting ProcessManager...") # Start process pool await self.process_pool.start() # Start background tasks self._running = True self._health_monitor_task = asyncio.create_task(self._health_monitor_loop()) self._metrics_cleanup_task = asyncio.create_task( self._metrics_cleanup_loop() ) logger.info("ProcessManager started successfully") except Exception as e: logger.error(f"Failed to start ProcessManager: {e}") raise async def stop(self) -> None: """Stop the process manager and clean up resources.""" try: logger.info("Stopping ProcessManager...") self._running = False # Cancel background tasks if self._health_monitor_task: self._health_monitor_task.cancel() try: await self._health_monitor_task except asyncio.CancelledError: pass if self._metrics_cleanup_task: self._metrics_cleanup_task.cancel() try: await self._metrics_cleanup_task except asyncio.CancelledError: pass # Stop process pool await self.process_pool.stop() logger.info("ProcessManager stopped") except Exception as e: logger.error(f"Error stopping ProcessManager: {e}") raise async def execute_api_call( self, account_id: str, method: str, args: List[Any] = None, kwargs: Dict[str, Any] = None, timeout: float = 30.0, ) -> Any: """ Execute API call on appropriate worker process. Args: account_id: Tiger account ID method: API method to execute args: Method arguments kwargs: Method keyword arguments timeout: Call timeout in seconds Returns: API call result Raises: RuntimeError: If execution fails """ start_time = datetime.utcnow() task_id = str(uuid.uuid4()) try: logger.debug(f"Executing API call {method} for account {account_id}") # Validate account account = await self.account_manager.get_account_by_id( uuid.UUID(account_id) ) if not account: raise RuntimeError(f"Account {account_id} not found") if account.status != AccountStatus.ACTIVE: raise RuntimeError(f"Account {account.account_number} is not active") # Execute on process pool result = await self.process_pool.execute_task( account_id=account_id, method=method, args=args, kwargs=kwargs, timeout=timeout, ) # Record success execution_time = (datetime.utcnow() - start_time).total_seconds() await self._record_task_completion( task_id, account_id, method, True, execution_time ) logger.debug( f"API call {method} completed successfully in {execution_time:.2f}s" ) return result except Exception as e: # Record failure execution_time = (datetime.utcnow() - start_time).total_seconds() await self._record_task_completion( task_id, account_id, method, False, execution_time, str(e) ) logger.error(f"API call {method} failed: {e}") raise async def get_account_process_status( self, account_id: str ) -> Optional[ProcessInfo]: """Get status of process handling the specified account.""" return await self.process_pool.get_process_status(account_id) async def get_all_process_status(self) -> List[ProcessInfo]: """Get status of all processes.""" return await self.process_pool.get_all_processes() async def get_process_metrics( self, process_id: str = None ) -> Dict[str, ProcessMetrics]: """ Get process performance metrics. Args: process_id: Specific process ID, or None for all processes Returns: Dictionary of process metrics """ if process_id: return {process_id: self.process_metrics.get(process_id)} else: return self.process_metrics.copy() async def get_system_metrics(self) -> Dict[str, Any]: """Get overall system metrics.""" try: processes = await self.get_all_process_status() # Count by status status_counts = {} for process in processes: status = process.status.value status_counts[status] = status_counts.get(status, 0) + 1 # Calculate aggregate metrics total_tasks = sum(m.total_tasks for m in self.process_metrics.values()) total_successful = sum( m.successful_tasks for m in self.process_metrics.values() ) total_failed = sum(m.failed_tasks for m in self.process_metrics.values()) avg_response_time = 0.0 if self.process_metrics: avg_response_time = sum( m.average_response_time for m in self.process_metrics.values() ) / len(self.process_metrics) return { "timestamp": datetime.utcnow().isoformat(), "total_processes": len(processes), "status_counts": status_counts, "total_tasks": total_tasks, "successful_tasks": total_successful, "failed_tasks": total_failed, "success_rate": (total_successful / max(total_tasks, 1)) * 100, "average_response_time": avg_response_time, "load_balance_strategy": self.load_balance_strategy.value, "auto_scale_enabled": self.auto_scale_enabled, } except Exception as e: logger.error(f"Failed to get system metrics: {e}") return {"error": str(e)} async def restart_account_process(self, account_id: str) -> bool: """ Restart process for specific account. Args: account_id: Account ID Returns: True if restart was successful """ try: logger.info(f"Restarting process for account {account_id}") success = await self.process_pool.restart_process(account_id) if success: # Reset metrics for the restarted process processes = await self.get_all_process_status() for process in processes: if process.account_id == account_id: self._reset_process_metrics(process.process_id, account_id) break return success except Exception as e: logger.error(f"Failed to restart process for account {account_id}: {e}") return False async def health_check_account(self, account_id: str) -> Dict[str, Any]: """ Perform health check for specific account. Args: account_id: Account ID Returns: Health check results """ try: result = await self.execute_api_call( account_id=account_id, method="health_check", timeout=10.0 ) return { "account_id": account_id, "healthy": True, "details": result, "timestamp": datetime.utcnow().isoformat(), } except Exception as e: logger.error(f"Health check failed for account {account_id}: {e}") return { "account_id": account_id, "healthy": False, "error": str(e), "timestamp": datetime.utcnow().isoformat(), } async def health_check_all_accounts(self) -> List[Dict[str, Any]]: """Perform health check for all active accounts.""" try: # Get all active accounts accounts = await self.account_manager.list_accounts( status=AccountStatus.ACTIVE, include_inactive=False ) # Run health checks concurrently tasks = [self.health_check_account(str(account.id)) for account in accounts] results = await asyncio.gather(*tasks, return_exceptions=True) # Filter out exceptions and convert to results health_results = [] for i, result in enumerate(results): if isinstance(result, Exception): health_results.append( { "account_id": str(accounts[i].id), "healthy": False, "error": str(result), "timestamp": datetime.utcnow().isoformat(), } ) else: health_results.append(result) return health_results except Exception as e: logger.error(f"Failed to run health checks: {e}") return [] # Private methods async def _record_task_completion( self, task_id: str, account_id: str, method: str, success: bool, execution_time: float, error: str = None, ) -> None: """Record task completion for metrics.""" try: # Find process handling this account processes = await self.get_all_process_status() process_id = None for process in processes: if process.account_id == account_id: process_id = process.process_id break if not process_id: return # Initialize metrics if not exists if process_id not in self.process_metrics: self._reset_process_metrics(process_id, account_id) metrics = self.process_metrics[process_id] # Update metrics metrics.total_tasks += 1 if success: metrics.successful_tasks += 1 else: metrics.failed_tasks += 1 metrics.last_error = error metrics.last_error_time = datetime.utcnow() # Update average response time if metrics.total_tasks == 1: metrics.average_response_time = execution_time else: # Exponential moving average alpha = 0.1 metrics.average_response_time = ( alpha * execution_time + (1 - alpha) * metrics.average_response_time ) # Add to history self.task_history.append( { "task_id": task_id, "process_id": process_id, "account_id": account_id, "method": method, "success": success, "execution_time": execution_time, "error": error, "timestamp": datetime.utcnow().isoformat(), } ) except Exception as e: logger.error(f"Failed to record task completion: {e}") def _reset_process_metrics(self, process_id: str, account_id: str) -> None: """Reset metrics for a process.""" self.process_metrics[process_id] = ProcessMetrics( process_id=process_id, account_id=account_id ) async def _health_monitor_loop(self) -> None: """Background task for health monitoring.""" logger.info("Health monitor started") while self._running: try: # Update process metrics await self._update_process_metrics() # Check for unhealthy processes await self._check_process_health() # Auto-scale if enabled if self.auto_scale_enabled: await self._auto_scale_processes() await asyncio.sleep(self.health_check_interval) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in health monitor: {e}") await asyncio.sleep(self.health_check_interval) logger.info("Health monitor stopped") async def _metrics_cleanup_loop(self) -> None: """Background task for cleaning up old metrics.""" logger.info("Metrics cleanup started") while self._running: try: # Clean up old task history cutoff_time = datetime.utcnow() - timedelta( hours=self.metrics_retention_hours ) old_count = len(self.task_history) self.task_history = [ task for task in self.task_history if datetime.fromisoformat(task["timestamp"]) > cutoff_time ] new_count = len(self.task_history) if old_count > new_count: logger.debug(f"Cleaned up {old_count - new_count} old task records") # Sleep for an hour before next cleanup await asyncio.sleep(3600) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in metrics cleanup: {e}") await asyncio.sleep(3600) logger.info("Metrics cleanup stopped") async def _update_process_metrics(self) -> None: """Update process metrics with current information.""" try: processes = await self.get_all_process_status() for process in processes: # Initialize metrics if not exists if process.process_id not in self.process_metrics: self._reset_process_metrics(process.process_id, process.account_id) metrics = self.process_metrics[process.process_id] # Update uptime if process.created_at: metrics.uptime_seconds = ( datetime.utcnow() - process.created_at ).total_seconds() # Update system metrics (would need process monitoring) # metrics.memory_usage_mb = get_process_memory(process.pid) # metrics.cpu_usage_percent = get_process_cpu(process.pid) except Exception as e: logger.error(f"Failed to update process metrics: {e}") async def _check_process_health(self) -> None: """Check process health and restart unhealthy processes.""" try: processes = await self.get_all_process_status() for process in processes: metrics = self.process_metrics.get(process.process_id) if not metrics: continue # Check error rate if metrics.total_tasks > 10 and metrics.error_rate > 50: logger.warning( f"Process {process.process_id} has high error rate: {metrics.error_rate:.1f}%" ) await self.restart_account_process(process.account_id) continue # Check if process is stuck if ( process.status == ProcessStatus.BUSY and process.last_heartbeat and (datetime.utcnow() - process.last_heartbeat).total_seconds() > 300 ): logger.warning( f"Process {process.process_id} appears stuck, restarting" ) await self.restart_account_process(process.account_id) continue except Exception as e: logger.error(f"Failed to check process health: {e}") async def _auto_scale_processes(self) -> None: """Auto-scale processes based on load.""" try: # Get current process count and load processes = await self.get_all_process_status() active_processes = [ p for p in processes if p.status in [ProcessStatus.READY, ProcessStatus.BUSY] ] # Simple auto-scaling logic - this could be enhanced if len(active_processes) < self.min_processes: logger.info("Process count below minimum, considering scaling up") # Would implement scaling logic here except Exception as e: logger.error(f"Failed to auto-scale processes: {e}") # Global process manager instance _process_manager: Optional[ProcessManager] = None def get_process_manager() -> ProcessManager: """Get global process manager instance.""" global _process_manager if _process_manager is None: _process_manager = ProcessManager() return _process_manager

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/luxiaolei/tiger-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server