Skip to main content
Glama
security.py7.61 kB
"""Security routes. This module provides endpoints for authentication and authorization. """ from datetime import datetime, timedelta from typing import Any from fastapi import APIRouter, Depends, HTTPException, status from jose import jwt from passlib.context import CryptContext from src.api.dependencies import CurrentUser from src.core.config import Settings, get_settings from src.models.schemas import TokenResponse, UserCreate, UserLogin router = APIRouter(prefix="/auth") # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # Simple in-memory user storage (would use database in production) _users: dict[str, dict[str, Any]] = {} def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against a hash.""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Hash a password.""" return pwd_context.hash(password) def create_access_token( data: dict[str, Any], settings: Settings, expires_delta: timedelta | None = None, ) -> str: """Create a JWT access token.""" to_encode = data.copy() expire = datetime.utcnow() + ( expires_delta or timedelta(minutes=settings.access_token_expire_minutes) ) to_encode.update({"exp": expire, "type": "access"}) return jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm) def create_refresh_token( data: dict[str, Any], settings: Settings, ) -> str: """Create a JWT refresh token.""" to_encode = data.copy() expire = datetime.utcnow() + timedelta(days=settings.refresh_token_expire_days) to_encode.update({"exp": expire, "type": "refresh"}) return jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm) @router.post( "/register", status_code=status.HTTP_201_CREATED, summary="Register User", description="Register a new user account.", ) async def register( data: UserCreate, settings: Settings = Depends(get_settings), ) -> dict[str, Any]: """Register a new user. Args: data: User registration data. settings: Application settings. Returns: Created user information. """ if data.email in _users: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Email already registered", ) import uuid user_id = str(uuid.uuid4()) user = { "id": len(_users) + 1, "uuid": user_id, "email": data.email, "username": data.username, "hashed_password": get_password_hash(data.password), "role": data.role.value, "is_active": True, "is_verified": False, "created_at": datetime.utcnow().isoformat(), } _users[data.email] = user return { "uuid": user_id, "email": data.email, "username": data.username, "role": data.role.value, } @router.post( "/login", summary="Login", description="Authenticate and get access token.", ) async def login( data: UserLogin, settings: Settings = Depends(get_settings), ) -> TokenResponse: """Login and get tokens. Args: data: Login credentials. settings: Application settings. Returns: Access and refresh tokens. """ user = _users.get(data.email) if not user or not verify_password(data.password, user["hashed_password"]): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", ) if not user["is_active"]: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Account is disabled", ) token_data = { "sub": str(user["id"]), "uuid": user["uuid"], "email": user["email"], "username": user["username"], "role": user["role"], } access_token = create_access_token(token_data, settings) refresh_token = create_refresh_token(token_data, settings) return TokenResponse( access_token=access_token, refresh_token=refresh_token, token_type="bearer", expires_in=settings.access_token_expire_minutes * 60, ) @router.post( "/refresh", summary="Refresh Token", description="Refresh access token using refresh token.", ) async def refresh_token( refresh_token: str, settings: Settings = Depends(get_settings), ) -> TokenResponse: """Refresh access token. Args: refresh_token: Refresh token. settings: Application settings. Returns: New access token. """ try: payload = jwt.decode( refresh_token, settings.secret_key, algorithms=[settings.algorithm], ) if payload.get("type") != "refresh": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type", ) token_data = { "sub": payload["sub"], "uuid": payload["uuid"], "email": payload["email"], "username": payload["username"], "role": payload["role"], } access_token = create_access_token(token_data, settings) return TokenResponse( access_token=access_token, refresh_token=refresh_token, token_type="bearer", expires_in=settings.access_token_expire_minutes * 60, ) except Exception as e: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Invalid refresh token: {e}", ) @router.get( "/me", summary="Get Current User", description="Get the currently authenticated user.", ) async def get_current_user_info(current_user: CurrentUser) -> dict[str, Any]: """Get current user info. Args: current_user: Current authenticated user. Returns: User information. """ return { "id": current_user.id, "uuid": current_user.uuid, "email": current_user.email, "username": current_user.username, "role": current_user.role.value, } @router.post( "/logout", summary="Logout", description="Logout and invalidate tokens.", ) async def logout() -> dict[str, bool]: """Logout user. Returns: Logout confirmation. """ # In production, would invalidate the token return {"logged_out": True} @router.post( "/change-password", summary="Change Password", description="Change the current user's password.", ) async def change_password( current_password: str, new_password: str, current_user: CurrentUser, ) -> dict[str, bool]: """Change password. Args: current_password: Current password. new_password: New password. current_user: Current authenticated user. Returns: Confirmation. """ user = None for u in _users.values(): if u["id"] == current_user.id: user = u break if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) if not verify_password(current_password, user["hashed_password"]): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Current password is incorrect", ) user["hashed_password"] = get_password_hash(new_password) return {"password_changed": True}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Fu-Jie/MCP-OPENAPI-DOCX'

If you have feedback or need assistance with the MCP directory API, please join our Discord server