Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
database_self_contained.py13 kB
""" Self-contained database configuration for Maverick-MCP. This module provides database configuration that is completely independent of external Django projects, using only mcp_ prefixed tables. """ import logging import os from sqlalchemy import create_engine, text from sqlalchemy.engine import Engine from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import NullPool from maverick_mcp.config.database import ( DatabasePoolConfig, get_pool_config_from_settings, ) from maverick_mcp.data.models import Base logger = logging.getLogger("maverick_mcp.config.database_self_contained") class SelfContainedDatabaseConfig: """Configuration for self-contained Maverick-MCP database.""" def __init__( self, database_url: str | None = None, pool_config: DatabasePoolConfig | None = None, ): """ Initialize self-contained database configuration. Args: database_url: Database connection URL. If None, will use environment variables pool_config: Database pool configuration. If None, will use settings-based config """ self.database_url = database_url or self._get_database_url() self.pool_config = pool_config or get_pool_config_from_settings() self.engine: Engine | None = None self.SessionLocal: sessionmaker | None = None def _get_database_url(self) -> str: """Get database URL from environment variables.""" # Try multiple possible environment variable names # Use SQLite in-memory for GitHub Actions or test environments if os.getenv("GITHUB_ACTIONS") == "true" or os.getenv("CI") == "true": return "sqlite:///:memory:" return ( os.getenv("DATABASE_URL") # Prefer standard DATABASE_URL or os.getenv("MCP_DATABASE_URL") or os.getenv("POSTGRES_URL") or "sqlite:///maverick_mcp.db" # Default to SQLite for development ) def create_engine(self) -> Engine: """Create and configure the database engine.""" if self.engine is not None: return self.engine # Log database connection (without password) masked_url = self._mask_database_url(self.database_url) logger.info(f"Creating self-contained database engine: {masked_url}") # Determine if we should use connection pooling use_pooling = os.getenv("DB_USE_POOLING", "true").lower() == "true" if use_pooling: # Use QueuePool for production environments engine_kwargs = { **self.pool_config.get_pool_kwargs(), "connect_args": self._get_connect_args(), "echo": os.getenv("DB_ECHO", "false").lower() == "true", } else: # Use NullPool for serverless/development environments engine_kwargs = { "poolclass": NullPool, "echo": os.getenv("DB_ECHO", "false").lower() == "true", } self.engine = create_engine(self.database_url, **engine_kwargs) # Set up pool monitoring if using pooled connections if use_pooling: self.pool_config.setup_pool_monitoring(self.engine) logger.info("Self-contained database engine created successfully") return self.engine def _mask_database_url(self, url: str) -> str: """Mask password in database URL for logging.""" if "@" in url and "://" in url: parts = url.split("://", 1) if len(parts) == 2 and "@" in parts[1]: user_pass, host_db = parts[1].split("@", 1) if ":" in user_pass: user, _ = user_pass.split(":", 1) return f"{parts[0]}://{user}:****@{host_db}" return url def _get_connect_args(self) -> dict: """Get connection arguments for the database engine.""" if "postgresql" in self.database_url: return { "connect_timeout": 10, "application_name": "maverick_mcp_self_contained", "options": "-c statement_timeout=30000", # 30 seconds } return {} def create_session_factory(self) -> sessionmaker: """Create session factory.""" if self.SessionLocal is not None: return self.SessionLocal if self.engine is None: self.create_engine() self.SessionLocal = sessionmaker( autocommit=False, autoflush=False, bind=self.engine ) logger.info("Session factory created for self-contained database") return self.SessionLocal def create_tables(self, drop_first: bool = False) -> None: """ Create all tables in the database. Args: drop_first: If True, drop all tables first (useful for testing) """ if self.engine is None: self.create_engine() if drop_first: logger.warning("Dropping all tables first (drop_first=True)") Base.metadata.drop_all(bind=self.engine) logger.info("Creating all self-contained tables...") Base.metadata.create_all(bind=self.engine) logger.info("All self-contained tables created successfully") def validate_schema(self) -> bool: """ Validate that all expected tables exist with mcp_ prefix. Returns: True if schema is valid, False otherwise """ if self.engine is None: self.create_engine() expected_tables = { "mcp_stocks", "mcp_price_cache", "mcp_maverick_stocks", "mcp_maverick_bear_stocks", "mcp_supply_demand_breakouts", "mcp_technical_cache", "mcp_users", # From auth models "mcp_api_keys", # From auth models "mcp_refresh_tokens", # From auth models } try: # Get list of tables in database with self.engine.connect() as conn: if "postgresql" in self.database_url: result = conn.execute( text(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_name LIKE 'mcp_%' """) ) elif "sqlite" in self.database_url: result = conn.execute( text(""" SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'mcp_%' """) ) else: logger.error(f"Unsupported database type: {self.database_url}") return False existing_tables = {row[0] for row in result.fetchall()} # Check if all expected tables exist missing_tables = expected_tables - existing_tables extra_tables = existing_tables - expected_tables if missing_tables: logger.error(f"Missing expected tables: {missing_tables}") return False if extra_tables: logger.warning(f"Found unexpected mcp_ tables: {extra_tables}") logger.info( f"Schema validation passed. Found {len(existing_tables)} mcp_ tables" ) return True except Exception as e: logger.error(f"Schema validation failed: {e}") return False def get_database_stats(self) -> dict: """Get statistics about the self-contained database.""" if self.engine is None: self.create_engine() stats = { "database_url": self._mask_database_url(self.database_url), "pool_config": self.pool_config.model_dump() if self.pool_config else None, "tables": {}, "total_records": 0, } table_queries = { "mcp_stocks": "SELECT COUNT(*) FROM mcp_stocks", "mcp_price_cache": "SELECT COUNT(*) FROM mcp_price_cache", "mcp_maverick_stocks": "SELECT COUNT(*) FROM mcp_maverick_stocks", "mcp_maverick_bear_stocks": "SELECT COUNT(*) FROM mcp_maverick_bear_stocks", "mcp_supply_demand_breakouts": "SELECT COUNT(*) FROM mcp_supply_demand_breakouts", "mcp_technical_cache": "SELECT COUNT(*) FROM mcp_technical_cache", } try: with self.engine.connect() as conn: for table, query in table_queries.items(): try: result = conn.execute(text(query)) count = result.scalar() stats["tables"][table] = count stats["total_records"] += count except Exception as e: stats["tables"][table] = f"Error: {e}" except Exception as e: stats["error"] = str(e) return stats def close(self) -> None: """Close database connections.""" if self.engine: self.engine.dispose() self.engine = None self.SessionLocal = None logger.info("Self-contained database connections closed") # Global instance for easy access _db_config: SelfContainedDatabaseConfig | None = None def get_self_contained_db_config() -> SelfContainedDatabaseConfig: """Get or create the global self-contained database configuration.""" global _db_config if _db_config is None: _db_config = SelfContainedDatabaseConfig() return _db_config def get_self_contained_engine() -> Engine: """Get the self-contained database engine.""" return get_self_contained_db_config().create_engine() def get_self_contained_session_factory() -> sessionmaker: """Get the self-contained session factory.""" return get_self_contained_db_config().create_session_factory() def init_self_contained_database( database_url: str | None = None, create_tables: bool = True, validate_schema: bool = True, ) -> SelfContainedDatabaseConfig: """ Initialize the self-contained database. Args: database_url: Optional database URL override create_tables: Whether to create tables if they don't exist validate_schema: Whether to validate the schema after initialization Returns: Configured SelfContainedDatabaseConfig instance """ global _db_config if database_url: _db_config = SelfContainedDatabaseConfig(database_url=database_url) else: _db_config = get_self_contained_db_config() # Create engine and session factory _db_config.create_engine() _db_config.create_session_factory() if create_tables: _db_config.create_tables() if validate_schema: if not _db_config.validate_schema(): logger.warning("Schema validation failed, but continuing...") logger.info("Self-contained database initialized successfully") return _db_config # Context manager for database sessions class SelfContainedDatabaseSession: """Context manager for self-contained database sessions.""" def __init__(self): self.session_factory = get_self_contained_session_factory() self.session = None def __enter__(self): self.session = self.session_factory() return self.session def __exit__(self, exc_type, exc_val, exc_tb): if self.session: if exc_type is not None: self.session.rollback() else: try: self.session.commit() except Exception: self.session.rollback() raise finally: self.session.close() def get_self_contained_db_session(): """Get a context manager for self-contained database sessions.""" return SelfContainedDatabaseSession() # Migration helper def run_self_contained_migrations(alembic_config_path: str = "alembic.ini"): """ Run migrations to ensure schema is up to date. Args: alembic_config_path: Path to alembic configuration file """ try: from alembic.config import Config from alembic import command # Set up alembic config alembic_cfg = Config(alembic_config_path) # Override database URL with self-contained URL db_config = get_self_contained_db_config() alembic_cfg.set_main_option("sqlalchemy.url", db_config.database_url) logger.info("Running self-contained database migrations...") command.upgrade(alembic_cfg, "head") logger.info("Self-contained database migrations completed successfully") except ImportError: logger.error("Alembic not available. Cannot run migrations.") raise except Exception as e: logger.error(f"Migration failed: {e}") raise

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server