Skip to main content
Glama
sqlalchemy.py7.4 kB
""" SQLAlchemy Complete ORM Template Full-featured async SQLAlchemy with models, repositories, and migrations """ from datetime import datetime from typing import Optional, List, TypeVar, Generic from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship from sqlalchemy import String, Text, Boolean, ForeignKey, select, func from contextlib import asynccontextmanager # ============================================ # Configuration # ============================================ DATABASE_URL = "postgresql+asyncpg://user:password@localhost/myapp" engine = create_async_engine(DATABASE_URL, echo=True) async_session = async_sessionmaker(engine, expire_on_commit=False) # ============================================ # Base Model # ============================================ class Base(DeclarativeBase): pass class TimestampMixin: created_at: Mapped[datetime] = mapped_column(default=func.now()) updated_at: Mapped[datetime] = mapped_column(default=func.now(), onupdate=func.now()) # ============================================ # Models # ============================================ class User(Base, TimestampMixin): __tablename__ = "users" id: Mapped[int] = mapped_column(primary_key=True) email: Mapped[str] = mapped_column(String(255), unique=True, index=True) name: Mapped[str] = mapped_column(String(100)) password_hash: Mapped[str] = mapped_column(String(255)) is_active: Mapped[bool] = mapped_column(Boolean, default=True) # Relationships posts: Mapped[List["Post"]] = relationship(back_populates="author", cascade="all, delete-orphan") def __repr__(self) -> str: return f"<User {self.email}>" class Post(Base, TimestampMixin): __tablename__ = "posts" id: Mapped[int] = mapped_column(primary_key=True) title: Mapped[str] = mapped_column(String(255)) content: Mapped[str] = mapped_column(Text) published: Mapped[bool] = mapped_column(Boolean, default=False) author_id: Mapped[int] = mapped_column(ForeignKey("users.id")) # Relationships author: Mapped["User"] = relationship(back_populates="posts") def __repr__(self) -> str: return f"<Post {self.title}>" class Tag(Base): __tablename__ = "tags" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(50), unique=True) # ============================================ # Repository Pattern # ============================================ T = TypeVar("T", bound=Base) class BaseRepository(Generic[T]): def __init__(self, model: type[T], session: AsyncSession): self.model = model self.session = session async def get_by_id(self, id: int) -> Optional[T]: return await self.session.get(self.model, id) async def get_all(self, limit: int = 100, offset: int = 0) -> List[T]: result = await self.session.execute( select(self.model).limit(limit).offset(offset) ) return list(result.scalars().all()) async def create(self, **kwargs) -> T: instance = self.model(**kwargs) self.session.add(instance) await self.session.commit() await self.session.refresh(instance) return instance async def update(self, instance: T, **kwargs) -> T: for key, value in kwargs.items(): setattr(instance, key, value) await self.session.commit() await self.session.refresh(instance) return instance async def delete(self, instance: T) -> None: await self.session.delete(instance) await self.session.commit() async def count(self) -> int: result = await self.session.execute( select(func.count()).select_from(self.model) ) return result.scalar_one() # ============================================ # Specialized Repositories # ============================================ class UserRepository(BaseRepository[User]): def __init__(self, session: AsyncSession): super().__init__(User, session) async def get_by_email(self, email: str) -> Optional[User]: result = await self.session.execute( select(User).where(User.email == email) ) return result.scalar_one_or_none() async def get_active_users(self) -> List[User]: result = await self.session.execute( select(User).where(User.is_active == True) ) return list(result.scalars().all()) async def search(self, query: str) -> List[User]: result = await self.session.execute( select(User).where( User.name.ilike(f"%{query}%") | User.email.ilike(f"%{query}%") ) ) return list(result.scalars().all()) class PostRepository(BaseRepository[Post]): def __init__(self, session: AsyncSession): super().__init__(Post, session) async def get_by_author(self, author_id: int) -> List[Post]: result = await self.session.execute( select(Post).where(Post.author_id == author_id).order_by(Post.created_at.desc()) ) return list(result.scalars().all()) async def get_published(self, limit: int = 20) -> List[Post]: result = await self.session.execute( select(Post) .where(Post.published == True) .order_by(Post.created_at.desc()) .limit(limit) ) return list(result.scalars().all()) async def publish(self, post: Post) -> Post: post.published = True await self.session.commit() return post # ============================================ # Session Management # ============================================ @asynccontextmanager async def get_session(): async with async_session() as session: try: yield session except Exception: await session.rollback() raise # ============================================ # Database Setup # ============================================ async def create_tables(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) print("Tables created") async def drop_tables(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) print("Tables dropped") # ============================================ # Usage Example # ============================================ async def example_usage(): # Create tables await create_tables() async with get_session() as session: user_repo = UserRepository(session) post_repo = PostRepository(session) # Create user user = await user_repo.create( email="john@example.com", name="John Doe", password_hash="hashed_password" ) print(f"Created user: {user}") # Create post post = await post_repo.create( title="Hello World", content="This is my first post!", author_id=user.id ) print(f"Created post: {post}") # Query published = await post_repo.get_published() print(f"Published posts: {len(published)}") if __name__ == "__main__": import asyncio asyncio.run(example_usage())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/millsydotdev/Code-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server