from typing import Generic, TypeVar, Type, List, Optional, Any, Dict, Union
from sqlalchemy import select, update, delete
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from app.models.database import Base
# Type variable for SQLAlchemy models
ModelType = TypeVar("ModelType", bound=Base)
# Type variable for Pydantic schemas
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class BaseRepository(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
"""Base repository implementing common CRUD operations.
This class provides base functionality for all repositories,
implementing the repository pattern for database access.
"""
def __init__(self, model: Type[ModelType]):
"""Initialize with the SQLAlchemy model class."""
self.model = model
async def get(self, db: AsyncSession, id: str) -> Optional[ModelType]:
"""Get a single record by ID."""
query = select(self.model).where(self.model.id == id)
result = await db.execute(query)
return result.scalars().first()
async def get_multi(
self, db: AsyncSession, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
"""Get multiple records with pagination."""
query = select(self.model).offset(skip).limit(limit)
result = await db.execute(query)
return result.scalars().all()
async def create(self, db: AsyncSession, *, obj_in: Union[CreateSchemaType, Dict[str, Any]]) -> ModelType:
"""Create a new record."""
if isinstance(obj_in, dict):
obj_data = obj_in
else:
obj_data = obj_in.model_dump()
db_obj = self.model(**obj_data)
db.add(db_obj)
await db.commit()
await db.refresh(db_obj)
return db_obj
async def update(
self, db: AsyncSession, *, db_obj: ModelType, obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
"""Update an existing record."""
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_obj, field, value)
db.add(db_obj)
await db.commit()
await db.refresh(db_obj)
return db_obj
async def delete(self, db: AsyncSession, *, id: str) -> Optional[ModelType]:
"""Delete a record by ID."""
obj = await self.get(db=db, id=id)
if obj:
await db.delete(obj)
await db.commit()
return obj
async def exists(self, db: AsyncSession, *, id: str) -> bool:
"""Check if a record exists by ID."""
query = select(self.model).where(self.model.id == id)
result = await db.execute(query)
return result.scalars().first() is not None