auths.pyโข3.66 kB
import os
import secrets
import jwt
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, HTTPException, status, Query, Request
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from argon2 import PasswordHasher # Argon2 hashing
from argon2.exceptions import VerifyMismatchError
from mcp_host.models.users import User as UserModel
from mcp_host.database.db import get_db
# JWT config
SECRET_KEY = os.environ.get("JWT_SECRET_KEY", "insecure-default-secret")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
# Argon2 hasher instance (thread-safe)
ph = PasswordHasher()
def hash_password(password: str) -> str:
return ph.hash(password)
def verify_password(plain_password: str, stored_hash: Optional[str]) -> bool:
if not stored_hash:
return False
try:
ph.verify(stored_hash, plain_password)
return True
except VerifyMismatchError:
return False
except Exception:
return False
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (
expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_user_by_email(db: AsyncSession, email: str):
result = await db.execute(select(UserModel).where(UserModel.email == email))
return result.scalar_one_or_none()
async def get_user_by_id(db: AsyncSession, user_id: str):
result = await db.execute(select(UserModel).where(UserModel.id == user_id))
return result.scalar_one_or_none()
async def signup_user(
db: AsyncSession, name: str, email: str, phone_number: str, password: str
) -> UserModel:
existing = await get_user_by_email(db, email)
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User with this email already exists",
)
user = UserModel(
name=name,
email=email,
phone_number=phone_number,
password_hash=hash_password(password),
)
db.add(user)
await db.flush()
await db.refresh(user)
return user
async def login_user(db: AsyncSession, email: str, password: str) -> str:
user = await get_user_by_email(db, email)
if not user or not verify_password(password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
token = create_access_token({"sub": str(user.id), "email": user.email})
return token
async def get_current_user(
token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)
) -> UserModel:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
email: str = payload.get("email")
if user_id is None or email is None:
raise credentials_exception
user = await get_user_by_id(db, user_id)
if user is None or user.email != email:
raise credentials_exception
return user
except jwt.PyJWTError:
raise credentials_exception