Skip to main content
Glama
nesirat

MCP Vulnerability Management System

by nesirat
auth.py4.1 kB
from datetime import timedelta from typing import Any from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session from app.core.auth import ( create_access_token, get_current_active_user, get_password_hash, verify_password, ) from app.core.config import settings from app.db.session import get_db from app.models.user import User from app.schemas.user import User as UserSchema from app.schemas.user import UserCreate, UserUpdate router = APIRouter() @router.post("/register", response_model=UserSchema) def register( *, db: Session = Depends(get_db), user_in: UserCreate, ) -> Any: """ Register a new user. """ user = db.query(User).filter(User.email == user_in.email).first() if user: raise HTTPException( status_code=400, detail="The user with this email already exists in the system.", ) user = db.query(User).filter(User.username == user_in.username).first() if user: raise HTTPException( status_code=400, detail="The user with this username already exists in the system.", ) user = User( email=user_in.email, username=user_in.username, hashed_password=get_password_hash(user_in.password), is_active=user_in.is_active, is_superuser=user_in.is_superuser, ) db.add(user) db.commit() db.refresh(user) return user @router.post("/login") def login( db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends() ) -> Any: """ OAuth2 compatible token login, get an access token for future requests. """ user = db.query(User).filter(User.username == form_data.username).first() if not user or not verify_password(form_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) if not user.is_active: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user" ) access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) return { "access_token": create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ), "token_type": "bearer", } @router.get("/me", response_model=UserSchema) def read_users_me(current_user: User = Depends(get_current_active_user)) -> Any: """ Get current user. """ return current_user @router.put("/me", response_model=UserSchema) def update_user_me( *, db: Session = Depends(get_db), user_in: UserUpdate, current_user: User = Depends(get_current_active_user), ) -> Any: """ Update own user. """ if user_in.email and user_in.email != current_user.email: user = db.query(User).filter(User.email == user_in.email).first() if user: raise HTTPException( status_code=400, detail="The user with this email already exists in the system.", ) if user_in.username and user_in.username != current_user.username: user = db.query(User).filter(User.username == user_in.username).first() if user: raise HTTPException( status_code=400, detail="The user with this username already exists in the system.", ) if user_in.password: current_user.hashed_password = get_password_hash(user_in.password) if user_in.email: current_user.email = user_in.email if user_in.username: current_user.username = user_in.username if user_in.is_active is not None: current_user.is_active = user_in.is_active if user_in.is_superuser is not None: current_user.is_superuser = user_in.is_superuser db.add(current_user) db.commit() db.refresh(current_user) return current_user

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nesirat/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server