Skip to main content
Glama
server.py•18.9 kB
""" Tiger MCP Server - Main orchestration class. Orchestrates all components including database, account services, process pool, and MCP tools with proper initialization order and lifecycle management. """ import asyncio import signal import sys from pathlib import Path from typing import Any, Dict, List, Optional from loguru import logger from .config_manager import TigerMCPConfig, get_config_manager # Add paths for shared and database imports _SHARED_PATH = Path(__file__).parent.parent.parent.parent / "shared" / "src" _DATABASE_PATH = Path(__file__).parent.parent.parent.parent / "database" / "src" if str(_SHARED_PATH) not in sys.path: sys.path.insert(0, str(_SHARED_PATH)) if str(_DATABASE_PATH) not in sys.path: sys.path.insert(0, str(_DATABASE_PATH)) class TigerMCPServer: """ Main Tiger MCP Server class. Orchestrates all components including: - Configuration management - Database initialization - Account management services - Process pool management - Background task scheduling - Graceful shutdown handling """ def __init__( self, config_file: Optional[str] = None, environment: Optional[str] = None ): """ Initialize Tiger MCP Server. Args: config_file: Optional configuration file path environment: Environment name (development, testing, production) """ self.config_manager = get_config_manager( config_file=config_file, environment=environment ) self.config: Optional[TigerMCPConfig] = None # Service components self.account_manager = None self.account_router = None self.process_manager = None self.tiger_service = None # Background tasks self.background_tasks: List[asyncio.Task] = [] self.shutdown_event = asyncio.Event() self._started = False # Setup signal handlers self._setup_signal_handlers() def _setup_signal_handlers(self): """Setup signal handlers for graceful shutdown.""" if sys.platform != "win32": for sig in (signal.SIGTERM, signal.SIGINT): signal.signal(sig, self._signal_handler) def _signal_handler(self, signum, frame): """Handle shutdown signals.""" logger.info(f"Received signal {signum}, initiating graceful shutdown...") self.shutdown_event.set() async def initialize(self) -> None: """ Initialize all server components. Components are initialized in dependency order: 1. Configuration loading and validation 2. Logging configuration 3. Database initialization 4. Account management services 5. Process pool initialization 6. Tiger API service 7. Background task startup Raises: Exception: If any component fails to initialize """ try: logger.info("Initializing Tiger MCP Server...") # 1. Load and validate configuration logger.info("Loading configuration...") self.config = self.config_manager.load_config() # 2. Configure logging self._configure_logging() # 3. Initialize database logger.info("Initializing database...") await self._initialize_database() # 4. Initialize account management logger.info("Initializing account management...") await self._initialize_account_services() # 5. Initialize process pool logger.info("Initializing process pool...") await self._initialize_process_pool() # 6. Initialize Tiger API service logger.info("Initializing Tiger API service...") await self._initialize_tiger_service() # 7. Start background tasks logger.info("Starting background tasks...") await self._start_background_tasks() self._started = True logger.success("Tiger MCP Server initialized successfully") except Exception as e: logger.error(f"Failed to initialize Tiger MCP Server: {e}") await self.cleanup() raise def _configure_logging(self) -> None: """Configure logging based on configuration.""" # Remove default logger logger.remove() # Configure console logging log_format = ( "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | " "<level>{level: <8}</level> | " "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - " "<level>{message}</level>" ) logger.add( sys.stdout, format=log_format, level=self.config.server.log_level, colorize=True, backtrace=True, diagnose=True, ) # Add file logging in production if self.config.environment == "production": logger.add( "logs/tiger_mcp_server.log", format=log_format, level=self.config.server.log_level, rotation="1 day", retention="30 days", compression="gz", ) logger.info(f"Logging configured for {self.config.environment} environment") async def _initialize_database(self) -> None: """Initialize database connection and migrations.""" try: from database import get_db_manager db_manager = get_db_manager() await db_manager.initialize( database_url=self.config.database.url, echo=self.config.database.echo ) logger.info("Database initialized successfully") except ImportError: logger.warning( "Database package not available, running without persistent storage" ) except Exception as e: logger.error(f"Failed to initialize database: {e}") raise async def _initialize_account_services(self) -> None: """Initialize account management services.""" try: from shared.account_manager import get_account_manager from shared.account_router import get_account_router # Initialize account manager self.account_manager = get_account_manager() await self.account_manager.initialize() # Initialize account router self.account_router = get_account_router() logger.info("Account services initialized successfully") except ImportError as e: logger.warning(f"Account services not available: {e}") except Exception as e: logger.error(f"Failed to initialize account services: {e}") raise async def _initialize_process_pool(self) -> None: """Initialize Tiger process pool.""" try: from .process_manager import get_process_manager self.process_manager = get_process_manager() # Configure process manager with settings await self.process_manager.configure( min_workers=self.config.process.min_workers, max_workers=self.config.process.max_workers, target_workers=self.config.process.target_workers, startup_timeout=self.config.process.startup_timeout, shutdown_timeout=self.config.process.shutdown_timeout, health_check_interval=self.config.process.health_check_interval, worker_restart_threshold=self.config.process.worker_restart_threshold, load_balance_strategy=self.config.process.load_balance_strategy, ) # Start process pool await self.process_manager.start() logger.info("Process pool initialized successfully") except Exception as e: logger.error(f"Failed to initialize process pool: {e}") raise async def _initialize_tiger_service(self) -> None: """Initialize Tiger API service.""" try: from .example_usage import TigerAPIService self.tiger_service = TigerAPIService() await self.tiger_service.start() logger.info("Tiger API service initialized successfully") except Exception as e: logger.error(f"Failed to initialize Tiger API service: {e}") raise async def _start_background_tasks(self) -> None: """Start background maintenance tasks.""" # Token refresh task if self.config.security.enable_token_validation and self.account_manager: task = asyncio.create_task(self._token_refresh_task()) self.background_tasks.append(task) logger.info("Token refresh task started") # Health monitoring task if self.process_manager: task = asyncio.create_task(self._health_monitor_task()) self.background_tasks.append(task) logger.info("Health monitoring task started") # Cleanup task task = asyncio.create_task(self._cleanup_task()) self.background_tasks.append(task) logger.info("Cleanup task started") async def _token_refresh_task(self) -> None: """Background task for token refresh.""" try: while not self.shutdown_event.is_set(): try: if self.account_manager: await self.account_manager.refresh_expiring_tokens( threshold=self.config.security.token_refresh_threshold ) except Exception as e: logger.error(f"Error in token refresh task: {e}") # Wait for next check (with early exit on shutdown) try: await asyncio.wait_for( self.shutdown_event.wait(), timeout=self.config.security.token_refresh_threshold / 2, ) except asyncio.TimeoutError: continue # Normal timeout, continue loop else: break # Shutdown event was set except asyncio.CancelledError: logger.info("Token refresh task cancelled") except Exception as e: logger.error(f"Unexpected error in token refresh task: {e}") async def _health_monitor_task(self) -> None: """Background task for health monitoring.""" try: while not self.shutdown_event.is_set(): try: if self.process_manager: # Perform health checks unhealthy_workers = ( await self.process_manager.check_worker_health() ) if unhealthy_workers: logger.warning( f"Found {len(unhealthy_workers)} unhealthy workers" ) # Check if we need to scale workers await self.process_manager.auto_scale() except Exception as e: logger.error(f"Error in health monitoring task: {e}") # Wait for next check try: await asyncio.wait_for( self.shutdown_event.wait(), timeout=self.config.process.health_check_interval, ) except asyncio.TimeoutError: continue else: break except asyncio.CancelledError: logger.info("Health monitoring task cancelled") except Exception as e: logger.error(f"Unexpected error in health monitoring task: {e}") async def _cleanup_task(self) -> None: """Background task for periodic cleanup.""" try: while not self.shutdown_event.is_set(): try: # Cleanup expired sessions, logs, etc. if self.account_manager: await self.account_manager.cleanup_expired_sessions() # Cleanup process metrics if self.process_manager: await self.process_manager.cleanup_old_metrics() except Exception as e: logger.error(f"Error in cleanup task: {e}") # Wait for next cleanup (every hour) try: await asyncio.wait_for( self.shutdown_event.wait(), timeout=3600 # 1 hour ) except asyncio.TimeoutError: continue else: break except asyncio.CancelledError: logger.info("Cleanup task cancelled") except Exception as e: logger.error(f"Unexpected error in cleanup task: {e}") async def wait_for_shutdown(self) -> None: """Wait for shutdown signal.""" await self.shutdown_event.wait() logger.info("Shutdown signal received") async def cleanup(self) -> None: """ Cleanup all server components. Components are cleaned up in reverse dependency order: 1. Stop background tasks 2. Stop Tiger API service 3. Stop process pool 4. Stop account services 5. Close database connections """ logger.info("Starting server cleanup...") # Cancel background tasks if self.background_tasks: logger.info("Stopping background tasks...") for task in self.background_tasks: task.cancel() # Wait for tasks to complete with timeout if self.background_tasks: try: await asyncio.wait_for( asyncio.gather(*self.background_tasks, return_exceptions=True), timeout=5.0, ) except asyncio.TimeoutError: logger.warning( "Some background tasks did not complete within timeout" ) self.background_tasks.clear() # Stop Tiger API service if self.tiger_service: try: logger.info("Stopping Tiger API service...") await self.tiger_service.stop() except Exception as e: logger.error(f"Error stopping Tiger API service: {e}") # Stop process pool if self.process_manager: try: logger.info("Stopping process pool...") await self.process_manager.stop() except Exception as e: logger.error(f"Error stopping process pool: {e}") # Cleanup account services if self.account_manager: try: logger.info("Stopping account services...") await self.account_manager.cleanup() except Exception as e: logger.error(f"Error stopping account services: {e}") # Close database connections try: from database import get_db_manager db_manager = get_db_manager() await db_manager.cleanup() logger.info("Database connections closed") except ImportError: pass # Database package not available except Exception as e: logger.error(f"Error closing database connections: {e}") self._started = False logger.info("Server cleanup completed") async def start(self) -> None: """ Start the Tiger MCP Server. Raises: RuntimeError: If server is already started Exception: If initialization fails """ if self._started: raise RuntimeError("Server is already started") await self.initialize() async def stop(self) -> None: """ Stop the Tiger MCP Server. Raises: RuntimeError: If server is not started """ if not self._started: raise RuntimeError("Server is not started") self.shutdown_event.set() await self.cleanup() @property def is_started(self) -> bool: """Check if server is started.""" return self._started def get_health_status(self) -> Dict[str, Any]: """ Get current server health status. Returns: Dictionary containing health information """ status = { "server": { "started": self._started, "environment": self.config.environment if self.config else None, "background_tasks": len(self.background_tasks), } } # Add process pool status if self.process_manager: try: status["process_pool"] = { "active_workers": len(self.process_manager.workers), "total_requests": sum( w.request_count for w in self.process_manager.workers.values() ), "failed_requests": sum( w.error_count for w in self.process_manager.workers.values() ), } except Exception: status["process_pool"] = {"error": "Unable to get process pool status"} # Add account manager status if self.account_manager: try: status["accounts"] = { "total_accounts": len(self.account_manager.accounts), "active_accounts": len( [ acc for acc in self.account_manager.accounts.values() if acc.status.value == "active" ] ), } except Exception: status["accounts"] = {"error": "Unable to get account status"} return status async def main(): """Main entry point for the server.""" server = None try: # Create and start server server = TigerMCPServer() await server.start() logger.success("Tiger MCP Server started successfully") # Wait for shutdown signal await server.wait_for_shutdown() except KeyboardInterrupt: logger.info("Received keyboard interrupt") except Exception as e: logger.error(f"Server error: {e}") finally: if server and server.is_started: await server.stop() logger.info("Tiger MCP Server stopped") if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/luxiaolei/tiger-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server