Skip to main content
Glama

Taiwan Stock Agent

by clsung
lifecycle_manager.py12.6 kB
"""Application lifecycle management for graceful startup and shutdown.""" import asyncio import atexit import logging import signal import sys from contextlib import asynccontextmanager from typing import Any, AsyncIterator, Callable, Dict, List, Optional from tw_stock_agent.utils.connection_pool import HTTPConnectionPool, close_global_pool from tw_stock_agent.utils.database_pool import AsyncDatabasePool from tw_stock_agent.utils.performance_monitor import PerformanceMonitor, stop_global_monitoring logger = logging.getLogger("tw-stock-agent.lifecycle_manager") class LifecycleManager: """Manages application lifecycle, resource initialization, and graceful shutdown""" def __init__(self): self._startup_hooks: List[Callable[[], Any]] = [] self._shutdown_hooks: List[Callable[[], Any]] = [] self._async_startup_hooks: List[Callable[[], Any]] = [] self._async_shutdown_hooks: List[Callable[[], Any]] = [] self._resources: List[Any] = [] self._shutdown_initiated = False self._shutdown_timeout = 30.0 # seconds # Register signal handlers for graceful shutdown self._register_signal_handlers() # Register atexit handler as fallback atexit.register(self._emergency_shutdown) def _register_signal_handlers(self) -> None: """Register signal handlers for graceful shutdown""" def signal_handler(signum, frame): logger.info(f"Received signal {signum}, initiating graceful shutdown...") if not self._shutdown_initiated: asyncio.create_task(self.shutdown()) if sys.platform != "win32": signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) def add_startup_hook(self, hook: Callable[[], Any]) -> None: """Add a synchronous startup hook""" self._startup_hooks.append(hook) def add_shutdown_hook(self, hook: Callable[[], Any]) -> None: """Add a synchronous shutdown hook""" self._shutdown_hooks.append(hook) def add_async_startup_hook(self, hook: Callable[[], Any]) -> None: """Add an asynchronous startup hook""" self._async_startup_hooks.append(hook) def add_async_shutdown_hook(self, hook: Callable[[], Any]) -> None: """Add an asynchronous shutdown hook""" self._async_shutdown_hooks.append(hook) def register_resource(self, resource: Any) -> None: """Register a resource that needs cleanup on shutdown""" self._resources.append(resource) async def startup(self) -> None: """Execute all startup hooks""" logger.info("Starting application lifecycle...") # Execute synchronous startup hooks for hook in self._startup_hooks: try: logger.debug(f"Executing startup hook: {hook.__name__}") hook() except Exception as e: logger.error(f"Startup hook {hook.__name__} failed: {e}") raise # Execute asynchronous startup hooks for hook in self._async_startup_hooks: try: logger.debug(f"Executing async startup hook: {hook.__name__}") if asyncio.iscoroutinefunction(hook): await hook() else: hook() except Exception as e: logger.error(f"Async startup hook {hook.__name__} failed: {e}") raise logger.info("Application startup completed") async def shutdown(self) -> None: """Execute all shutdown hooks with timeout""" if self._shutdown_initiated: logger.warning("Shutdown already initiated") return self._shutdown_initiated = True logger.info("Starting graceful shutdown...") try: # Execute shutdown with timeout await asyncio.wait_for(self._execute_shutdown(), timeout=self._shutdown_timeout) logger.info("Graceful shutdown completed") except asyncio.TimeoutError: logger.error(f"Shutdown timeout after {self._shutdown_timeout}s, forcing exit") self._force_shutdown() except Exception as e: logger.error(f"Error during shutdown: {e}") self._force_shutdown() async def _execute_shutdown(self) -> None: """Execute shutdown hooks""" # Execute asynchronous shutdown hooks first for hook in self._async_shutdown_hooks: try: logger.debug(f"Executing async shutdown hook: {hook.__name__}") if asyncio.iscoroutinefunction(hook): await hook() else: hook() except Exception as e: logger.error(f"Async shutdown hook {hook.__name__} failed: {e}") # Clean up registered resources for resource in self._resources: try: logger.debug(f"Cleaning up resource: {type(resource).__name__}") if hasattr(resource, 'close'): if asyncio.iscoroutinefunction(resource.close): await resource.close() else: resource.close() elif hasattr(resource, '__aenter__') and hasattr(resource, '__aexit__'): # It's an async context manager, try to exit await resource.__aexit__(None, None, None) except Exception as e: logger.error(f"Failed to cleanup resource {type(resource).__name__}: {e}") # Execute synchronous shutdown hooks for hook in self._shutdown_hooks: try: logger.debug(f"Executing shutdown hook: {hook.__name__}") hook() except Exception as e: logger.error(f"Shutdown hook {hook.__name__} failed: {e}") def _emergency_shutdown(self) -> None: """Emergency shutdown called by atexit""" if not self._shutdown_initiated: logger.warning("Emergency shutdown initiated via atexit") self._force_shutdown() def _force_shutdown(self) -> None: """Force immediate shutdown""" logger.warning("Forcing immediate shutdown") # Try to clean up resources quickly for resource in self._resources: try: if hasattr(resource, 'close'): resource.close() except: pass # Ignore errors during force shutdown # Cancel all running tasks try: loop = asyncio.get_event_loop() if loop.is_running(): tasks = [task for task in asyncio.all_tasks(loop) if not task.done()] for task in tasks: task.cancel() except: pass @asynccontextmanager async def lifespan(self) -> AsyncIterator[None]: """Async context manager for complete application lifecycle""" try: await self.startup() yield finally: await self.shutdown() class ConnectionPoolManager: """Manages connection pools for the application""" def __init__(self): self.http_pool: Optional[HTTPConnectionPool] = None self.db_pool: Optional[AsyncDatabasePool] = None self.performance_monitor: Optional[PerformanceMonitor] = None self._initialized = False async def initialize(self, db_path: str) -> None: """Initialize all connection pools""" if self._initialized: return logger.info("Initializing connection pools...") # Initialize HTTP connection pool self.http_pool = HTTPConnectionPool() logger.info("HTTP connection pool initialized") # Initialize database connection pool self.db_pool = AsyncDatabasePool(db_path) logger.info(f"Database connection pool initialized for {db_path}") # Initialize performance monitoring from tw_stock_agent.utils.performance_monitor import get_global_monitor self.performance_monitor = get_global_monitor() # Register pools with performance monitor if self.http_pool and self.performance_monitor: self.performance_monitor.register_http_pool(self.http_pool.get_metrics) if self.db_pool and self.performance_monitor: self.performance_monitor.register_db_pool(self.db_pool.get_metrics) # Start performance monitoring await self.performance_monitor.start_monitoring() logger.info("Performance monitoring started") # Warm up HTTP connections to common endpoints if self.http_pool: warmup_urls = [ "https://www.twse.com.tw", "https://isin.twse.com.tw", "https://www.tpex.org.tw", ] await self.http_pool.warm_up(warmup_urls) self._initialized = True logger.info("Connection pools initialization completed") async def health_check(self) -> Dict[str, bool]: """Perform health check on all connection pools""" results = {} if self.http_pool: results['http_pool'] = await self.http_pool.health_check() if self.db_pool: results['db_pool'] = await self.db_pool.health_check() return results def get_performance_summary(self) -> Dict[str, Any]: """Get performance summary from all pools""" if self.performance_monitor: return self.performance_monitor.get_performance_summary() return {"status": "monitoring_disabled"} async def close(self) -> None: """Close all connection pools""" if not self._initialized: return logger.info("Closing connection pools...") # Stop performance monitoring if self.performance_monitor: await self.performance_monitor.stop_monitoring() # Close HTTP pool if self.http_pool: await self.http_pool.close() logger.info("HTTP connection pool closed") # Close database pool if self.db_pool: await self.db_pool.close() logger.info("Database connection pool closed") # Close global pools await close_global_pool() await stop_global_monitoring() self._initialized = False logger.info("All connection pools closed") # Global instances _lifecycle_manager: Optional[LifecycleManager] = None _pool_manager: Optional[ConnectionPoolManager] = None def get_lifecycle_manager() -> LifecycleManager: """Get or create the global lifecycle manager""" global _lifecycle_manager if _lifecycle_manager is None: _lifecycle_manager = LifecycleManager() return _lifecycle_manager def get_pool_manager() -> ConnectionPoolManager: """Get or create the global connection pool manager""" global _pool_manager if _pool_manager is None: _pool_manager = ConnectionPoolManager() return _pool_manager @asynccontextmanager async def application_lifespan(db_path: str) -> AsyncIterator[ConnectionPoolManager]: """Complete application lifespan context manager""" lifecycle = get_lifecycle_manager() pool_manager = get_pool_manager() # Register pool manager cleanup with lifecycle manager lifecycle.add_async_shutdown_hook(pool_manager.close) async with lifecycle.lifespan(): # Initialize connection pools await pool_manager.initialize(db_path) yield pool_manager # Convenience functions for common lifecycle operations async def initialize_application(db_path: str) -> ConnectionPoolManager: """Initialize the application with all connection pools""" pool_manager = get_pool_manager() await pool_manager.initialize(db_path) return pool_manager async def shutdown_application() -> None: """Gracefully shutdown the application""" lifecycle = get_lifecycle_manager() await lifecycle.shutdown() def setup_signal_handlers() -> None: """Setup signal handlers for graceful shutdown""" get_lifecycle_manager() # This will register signal handlers

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/clsung/tw-stock-agent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server