"""
SQLAlchemy Complete ORM Template
Full-featured async SQLAlchemy with models, repositories, and migrations
"""
from datetime import datetime
from typing import Optional, List, TypeVar, Generic
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy import String, Text, Boolean, ForeignKey, select, func
from contextlib import asynccontextmanager
# ============================================
# Configuration
# ============================================
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/myapp"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = async_sessionmaker(engine, expire_on_commit=False)
# ============================================
# Base Model
# ============================================
class Base(DeclarativeBase):
pass
class TimestampMixin:
created_at: Mapped[datetime] = mapped_column(default=func.now())
updated_at: Mapped[datetime] = mapped_column(default=func.now(), onupdate=func.now())
# ============================================
# Models
# ============================================
class User(Base, TimestampMixin):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
name: Mapped[str] = mapped_column(String(100))
password_hash: Mapped[str] = mapped_column(String(255))
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
# Relationships
posts: Mapped[List["Post"]] = relationship(back_populates="author", cascade="all, delete-orphan")
def __repr__(self) -> str:
return f"<User {self.email}>"
class Post(Base, TimestampMixin):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(255))
content: Mapped[str] = mapped_column(Text)
published: Mapped[bool] = mapped_column(Boolean, default=False)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
# Relationships
author: Mapped["User"] = relationship(back_populates="posts")
def __repr__(self) -> str:
return f"<Post {self.title}>"
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
# ============================================
# Repository Pattern
# ============================================
T = TypeVar("T", bound=Base)
class BaseRepository(Generic[T]):
def __init__(self, model: type[T], session: AsyncSession):
self.model = model
self.session = session
async def get_by_id(self, id: int) -> Optional[T]:
return await self.session.get(self.model, id)
async def get_all(self, limit: int = 100, offset: int = 0) -> List[T]:
result = await self.session.execute(
select(self.model).limit(limit).offset(offset)
)
return list(result.scalars().all())
async def create(self, **kwargs) -> T:
instance = self.model(**kwargs)
self.session.add(instance)
await self.session.commit()
await self.session.refresh(instance)
return instance
async def update(self, instance: T, **kwargs) -> T:
for key, value in kwargs.items():
setattr(instance, key, value)
await self.session.commit()
await self.session.refresh(instance)
return instance
async def delete(self, instance: T) -> None:
await self.session.delete(instance)
await self.session.commit()
async def count(self) -> int:
result = await self.session.execute(
select(func.count()).select_from(self.model)
)
return result.scalar_one()
# ============================================
# Specialized Repositories
# ============================================
class UserRepository(BaseRepository[User]):
def __init__(self, session: AsyncSession):
super().__init__(User, session)
async def get_by_email(self, email: str) -> Optional[User]:
result = await self.session.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
async def get_active_users(self) -> List[User]:
result = await self.session.execute(
select(User).where(User.is_active == True)
)
return list(result.scalars().all())
async def search(self, query: str) -> List[User]:
result = await self.session.execute(
select(User).where(
User.name.ilike(f"%{query}%") | User.email.ilike(f"%{query}%")
)
)
return list(result.scalars().all())
class PostRepository(BaseRepository[Post]):
def __init__(self, session: AsyncSession):
super().__init__(Post, session)
async def get_by_author(self, author_id: int) -> List[Post]:
result = await self.session.execute(
select(Post).where(Post.author_id == author_id).order_by(Post.created_at.desc())
)
return list(result.scalars().all())
async def get_published(self, limit: int = 20) -> List[Post]:
result = await self.session.execute(
select(Post)
.where(Post.published == True)
.order_by(Post.created_at.desc())
.limit(limit)
)
return list(result.scalars().all())
async def publish(self, post: Post) -> Post:
post.published = True
await self.session.commit()
return post
# ============================================
# Session Management
# ============================================
@asynccontextmanager
async def get_session():
async with async_session() as session:
try:
yield session
except Exception:
await session.rollback()
raise
# ============================================
# Database Setup
# ============================================
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
print("Tables created")
async def drop_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
print("Tables dropped")
# ============================================
# Usage Example
# ============================================
async def example_usage():
# Create tables
await create_tables()
async with get_session() as session:
user_repo = UserRepository(session)
post_repo = PostRepository(session)
# Create user
user = await user_repo.create(
email="john@example.com",
name="John Doe",
password_hash="hashed_password"
)
print(f"Created user: {user}")
# Create post
post = await post_repo.create(
title="Hello World",
content="This is my first post!",
author_id=user.id
)
print(f"Created post: {post}")
# Query
published = await post_repo.get_published()
print(f"Published posts: {len(published)}")
if __name__ == "__main__":
import asyncio
asyncio.run(example_usage())