Skip to main content
Glama
database.py6.25 kB
import os from pathlib import Path from sqlmodel import SQLModel, create_engine, Session, text from .models import Setting, LogEntry # Database file path - handle both local and Docker environments if os.getenv('DOCKER_ENV') or Path('/app').exists(): # Docker environment: use /app/db DB_DIR = Path("/app/db") else: # Local development environment: use manager/db DB_DIR = Path(__file__).parent.parent / "db" DB_DIR.mkdir(exist_ok=True) DB_PATH = DB_DIR / "manager.db" # Create database engine, configure WAL mode and related optimizations engine = create_engine( f"sqlite:///{DB_PATH}", echo=False, connect_args={ "check_same_thread": False, # Allow multi-thread access } ) def create_db_and_tables(): """Create database and tables""" SQLModel.metadata.create_all(engine) # Configure WAL mode and related optimizations with engine.connect() as connection: # Enable WAL mode connection.execute(text("PRAGMA journal_mode = WAL;")) # Set synchronous mode to NORMAL for better performance connection.execute(text("PRAGMA synchronous = NORMAL;")) # Set WAL auto-checkpoint size (in pages) connection.execute(text("PRAGMA wal_autocheckpoint = 500;")) # Set cache size (in pages, negative means KB) connection.execute(text("PRAGMA cache_size = -32000;")) # 32MB # Enable foreign key constraints connection.execute(text("PRAGMA foreign_keys = ON;")) connection.commit() def configure_connection_pragmas(connection): """Configure connection PRAGMA settings""" connection.execute(text("PRAGMA journal_mode = WAL;")) connection.execute(text("PRAGMA synchronous = NORMAL;")) connection.execute(text("PRAGMA foreign_keys = ON;")) def get_session(): """Get database session""" with Session(engine) as session: # Ensure each session is configured with correct PRAGMA session.exec(text("PRAGMA journal_mode = WAL;")) session.exec(text("PRAGMA synchronous = NORMAL;")) session.exec(text("PRAGMA foreign_keys = ON;")) yield session async def init_database(): """Initialize database""" create_db_and_tables() # Check if setting record exists, create default if not with Session(engine) as session: from sqlmodel import select existing_setting = session.exec(select(Setting)).first() if not existing_setting: try: # Import configuration modules from config import get_all_config_keys, get_sensitive_config_keys from config.manager import ConfigManager from config.sources import EnvironmentConfigSource, DefaultValueConfigSource # Create a temporary config manager without ManagerBackendSource to avoid circular dependency temp_config_manager = ConfigManager() temp_config_manager._sources = [ EnvironmentConfigSource(), DefaultValueConfigSource() ] default_config = await temp_config_manager.get_config(get_all_config_keys()) print(f"Default config: {default_config}") from .crypto import crypto_manager sensitive_config_keys = get_sensitive_config_keys() for key in sensitive_config_keys: if key in default_config: encrypted_key = f"{key}_encrypted" default_config[encrypted_key] = crypto_manager.encrypt(default_config[key]) default_setting = Setting.model_validate(default_config) session.add(default_setting) session.commit() except Exception as e: print(f"Error creating default setting: {e}") def get_setting(): """Get settings (only one record)""" with Session(engine) as session: from sqlmodel import select setting = session.exec(select(Setting)).first() if not setting: raise ValueError("No setting found. Please initialize the database first.") return setting def update_setting(setting_data: dict): """Update settings""" with Session(engine) as session: from sqlmodel import select setting = session.exec(select(Setting)).first() if not setting: raise ValueError("No setting found. Please initialize the database first.") # Update fields, special handling for encrypted fields from .crypto import crypto_manager from config import get_sensitive_config_keys sensitive_config_keys = get_sensitive_config_keys() # Update field values for key, value in setting_data.items(): if hasattr(setting, key): # Encrypt sensitive fields if key in sensitive_config_keys: # Encrypted fields need to be set to corresponding _encrypted field encrypted_key = f"{key}_encrypted" if hasattr(setting, encrypted_key): setattr(setting, encrypted_key, crypto_manager.encrypt(value)) else: # Regular fields are set directly setattr(setting, key, value) session.add(setting) session.commit() session.refresh(setting) return setting def cleanup_old_logs(): try: from sqlmodel import select from datetime import datetime, timedelta setting = get_setting() days = setting.log_save_days cutoff_time = datetime.now() - timedelta(days=days) # Delete expired logs with Session(engine) as session: delete_query = select(LogEntry).where(LogEntry.timestamp < cutoff_time) old_logs = session.exec(delete_query).all() deleted_count = len(old_logs) for log in old_logs: session.delete(log) session.commit() print(f"Cleaned up {deleted_count} old log entries (older than {days} days)") except Exception as e: print(f"Error during log cleanup: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/itcook/graphiti-mcp-pro'

If you have feedback or need assistance with the MCP directory API, please join our Discord server