Skip to main content
Glama
auth.py6.15 kB
"""Authentication and authorization utilities""" from datetime import datetime, timedelta from typing import Optional import jwt from jwt.exceptions import PyJWTError as JWTError from passlib.context import CryptContext from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from .config import settings from .database import get_db from .models import User from .schemas import TokenData # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # HTTP Bearer token scheme (auto_error=False makes it optional) security = HTTPBearer(auto_error=False) def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Hash a password (bcrypt has 72 byte limit)""" # Ensure password is within bcrypt's 72 byte limit if len(password.encode('utf-8')) > 72: raise ValueError("Password cannot exceed 72 bytes") return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """ Create a JWT access token Args: data: Data to encode in the token expires_delta: Optional expiration time delta Returns: Encoded JWT token """ to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, settings.jwt_secret_key, algorithm=settings.jwt_algorithm) return encoded_jwt def verify_token(token: str) -> TokenData: """ Verify and decode a JWT token Args: token: JWT token to verify Returns: TokenData with user information Raises: HTTPException: If token is invalid """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm]) email: str = payload.get("sub") if email is None: raise credentials_exception token_data = TokenData(email=email) return token_data except JWTError: raise credentials_exception def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ) -> User: """ Get the current authenticated user from the JWT token Args: credentials: HTTP Bearer credentials from request db: Database session Returns: Current user object Raises: HTTPException: If user is not found or token is invalid """ token = credentials.credentials token_data = verify_token(token) user = db.query(User).filter(User.email == token_data.email).first() if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found", headers={"WWW-Authenticate": "Bearer"}, ) return user def get_optional_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), db: Session = Depends(get_db) ) -> Optional[User]: """ Get the current user if authentication is enabled and token is provided. Returns None if authentication is disabled. Args: credentials: HTTP Bearer credentials from request (optional) db: Database session Returns: Current user object if authenticated, None if auth is disabled Raises: HTTPException: If auth is required and token is invalid """ # If authentication is not required, return None (no user context) if not settings.require_auth: return None # If authentication is required but no credentials provided if credentials is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required. Please provide a valid Bearer token.", headers={"WWW-Authenticate": "Bearer"}, ) # Validate the token token = credentials.credentials token_data = verify_token(token) user = db.query(User).filter(User.email == token_data.email).first() if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found", headers={"WWW-Authenticate": "Bearer"}, ) return user def authenticate_user(db: Session, email: str, password: str) -> Optional[User]: """ Authenticate a user with email and password Args: db: Database session email: User email password: Plain text password Returns: User object if authentication successful, None otherwise """ user = db.query(User).filter(User.email == email).first() if not user: return None if not verify_password(password, user.hashed_password): return None return user def create_user(db: Session, email: str, password: str, full_name: Optional[str] = None) -> User: """ Create a new user Args: db: Database session email: User email password: Plain text password full_name: Optional full name Returns: Created user object Raises: ValueError: If user with email already exists """ # Check if user exists existing_user = db.query(User).filter(User.email == email).first() if existing_user: raise ValueError("User with this email already exists") # Create new user hashed_password = get_password_hash(password) user = User( email=email, hashed_password=hashed_password, full_name=full_name ) db.add(user) db.commit() db.refresh(user) return user

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cwente25/KnowledgeBaseMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server