Skip to main content
Glama
env.py2.53 kB
"""Alembic environment configuration. This module configures Alembic for running database migrations. """ import asyncio from logging.config import fileConfig from sqlalchemy import pool from sqlalchemy.engine import Connection from sqlalchemy.ext.asyncio import async_engine_from_config from alembic import context from src.core.config import get_settings from src.database.base import Base from src.models.database import ( User, Document, DocumentVersion, Comment, Revision, Template, AuditLog, ) # Alembic Config object config = context.config # Set database URL from settings settings = get_settings() config.set_main_option("sqlalchemy.url", settings.database_url) # Interpret the config file for Python logging if config.config_file_name is not None: fileConfig(config.config_file_name) # Target metadata for autogenerate target_metadata = Base.metadata def run_migrations_offline() -> None: """Run migrations in 'offline' mode. This configures the context with just a URL and not an Engine, though an Engine is acceptable here as well. By skipping the Engine creation we don't even need a DBAPI to be available. Calls to context.execute() here emit the given string to the script output. """ url = config.get_main_option("sqlalchemy.url") context.configure( url=url, target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, ) with context.begin_transaction(): context.run_migrations() def do_run_migrations(connection: Connection) -> None: """Run migrations with a connection.""" context.configure(connection=connection, target_metadata=target_metadata) with context.begin_transaction(): context.run_migrations() async def run_async_migrations() -> None: """Run migrations in async mode.""" connectable = async_engine_from_config( config.get_section(config.config_ini_section, {}), prefix="sqlalchemy.", poolclass=pool.NullPool, ) async with connectable.connect() as connection: await connection.run_sync(do_run_migrations) await connectable.dispose() def run_migrations_online() -> None: """Run migrations in 'online' mode. In this scenario we need to create an Engine and associate a connection with the context. """ asyncio.run(run_async_migrations()) if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Fu-Jie/MCP-OPENAPI-DOCX'

If you have feedback or need assistance with the MCP directory API, please join our Discord server