Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
session_management.py9.22 kB
""" Enhanced database session management with context managers. This module provides robust context managers for database session management that guarantee proper cleanup, automatic rollback on errors, and connection pool monitoring to prevent connection leaks. Addresses Issue #55: Implement Proper Database Session Management with Context Managers """ import logging from collections.abc import AsyncGenerator, Generator from contextlib import asynccontextmanager, contextmanager from typing import Any from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Session from maverick_mcp.data.models import ( SessionLocal, _get_async_session_factory, ) logger = logging.getLogger(__name__) @contextmanager def get_db_session() -> Generator[Session, None, None]: """ Enhanced sync database session context manager. Provides: - Automatic session cleanup - Auto-commit on success - Auto-rollback on exceptions - Guaranteed session.close() even if commit/rollback fails Usage: with get_db_session() as session: # Perform database operations result = session.query(Model).all() # Session is automatically committed and closed Returns: Database session that will be properly managed Raises: Exception: Re-raises any database exceptions after rollback """ session = SessionLocal() try: yield session session.commit() logger.debug("Database session committed successfully") except Exception as e: session.rollback() logger.warning(f"Database session rolled back due to error: {e}") raise finally: session.close() logger.debug("Database session closed") @asynccontextmanager async def get_async_db_session() -> AsyncGenerator[AsyncSession, None]: """ Enhanced async database session context manager. Provides: - Automatic session cleanup for async operations - Auto-commit on success - Auto-rollback on exceptions - Guaranteed session.close() even if commit/rollback fails Usage: async with get_async_db_session() as session: # Perform async database operations result = await session.execute(query) # Session is automatically committed and closed Returns: Async database session that will be properly managed Raises: Exception: Re-raises any database exceptions after rollback """ async_session_factory = _get_async_session_factory() async with async_session_factory() as session: try: yield session await session.commit() logger.debug("Async database session committed successfully") except Exception as e: await session.rollback() logger.warning(f"Async database session rolled back due to error: {e}") raise finally: await session.close() logger.debug("Async database session closed") @contextmanager def get_db_session_read_only() -> Generator[Session, None, None]: """ Enhanced sync database session context manager for read-only operations. Optimized for read-only operations: - No auto-commit (read-only) - Rollback on any exception - Guaranteed session cleanup Usage: with get_db_session_read_only() as session: # Perform read-only database operations result = session.query(Model).all() # Session is automatically closed (no commit) Returns: Database session configured for read-only operations Raises: Exception: Re-raises any database exceptions after rollback """ session = SessionLocal() try: yield session # No commit for read-only operations logger.debug("Read-only database session completed successfully") except Exception as e: session.rollback() logger.warning(f"Read-only database session rolled back due to error: {e}") raise finally: session.close() logger.debug("Read-only database session closed") @asynccontextmanager async def get_async_db_session_read_only() -> AsyncGenerator[AsyncSession, None]: """ Enhanced async database session context manager for read-only operations. Optimized for read-only operations: - No auto-commit (read-only) - Rollback on any exception - Guaranteed session cleanup Usage: async with get_async_db_session_read_only() as session: # Perform read-only async database operations result = await session.execute(query) # Session is automatically closed (no commit) Returns: Async database session configured for read-only operations Raises: Exception: Re-raises any database exceptions after rollback """ async_session_factory = _get_async_session_factory() async with async_session_factory() as session: try: yield session # No commit for read-only operations logger.debug("Read-only async database session completed successfully") except Exception as e: await session.rollback() logger.warning( f"Read-only async database session rolled back due to error: {e}" ) raise finally: await session.close() logger.debug("Read-only async database session closed") def get_connection_pool_status() -> dict[str, Any]: """ Get current connection pool status for monitoring. Returns: Dictionary containing pool metrics: - pool_size: Current pool size - checked_in: Number of connections currently checked in - checked_out: Number of connections currently checked out - overflow: Number of connections beyond pool_size - invalid: Number of invalid connections """ from maverick_mcp.data.models import engine pool = engine.pool return { "pool_size": getattr(pool, "size", lambda: 0)(), "checked_in": getattr(pool, "checkedin", lambda: 0)(), "checked_out": getattr(pool, "checkedout", lambda: 0)(), "overflow": getattr(pool, "overflow", lambda: 0)(), "invalid": getattr(pool, "invalid", lambda: 0)(), "pool_status": "healthy" if getattr(pool, "checkedout", lambda: 0)() < getattr(pool, "size", lambda: 10)() * 0.8 else "warning", } async def get_async_connection_pool_status() -> dict[str, Any]: """ Get current async connection pool status for monitoring. Returns: Dictionary containing async pool metrics """ from maverick_mcp.data.models import _get_async_engine engine = _get_async_engine() pool = engine.pool return { "pool_size": getattr(pool, "size", lambda: 0)(), "checked_in": getattr(pool, "checkedin", lambda: 0)(), "checked_out": getattr(pool, "checkedout", lambda: 0)(), "overflow": getattr(pool, "overflow", lambda: 0)(), "invalid": getattr(pool, "invalid", lambda: 0)(), "pool_status": "healthy" if getattr(pool, "checkedout", lambda: 0)() < getattr(pool, "size", lambda: 10)() * 0.8 else "warning", } def check_connection_pool_health() -> bool: """ Check if connection pool is healthy. Returns: True if pool is healthy, False if approaching limits """ try: status = get_connection_pool_status() pool_utilization = ( status["checked_out"] / status["pool_size"] if status["pool_size"] > 0 else 0 ) # Consider unhealthy if > 80% utilization if pool_utilization > 0.8: logger.warning(f"High connection pool utilization: {pool_utilization:.2%}") return False # Check for invalid connections if status["invalid"] > 0: logger.warning(f"Invalid connections detected: {status['invalid']}") return False return True except Exception as e: logger.error(f"Failed to check connection pool health: {e}") return False async def check_async_connection_pool_health() -> bool: """ Check if async connection pool is healthy. Returns: True if pool is healthy, False if approaching limits """ try: status = await get_async_connection_pool_status() pool_utilization = ( status["checked_out"] / status["pool_size"] if status["pool_size"] > 0 else 0 ) # Consider unhealthy if > 80% utilization if pool_utilization > 0.8: logger.warning( f"High async connection pool utilization: {pool_utilization:.2%}" ) return False # Check for invalid connections if status["invalid"] > 0: logger.warning(f"Invalid async connections detected: {status['invalid']}") return False return True except Exception as e: logger.error(f"Failed to check async connection pool health: {e}") return False

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server