Skip to main content
Glama
nesirat

MCP Vulnerability Management System

by nesirat
tokens.py3.62 kB
from datetime import datetime, timedelta from typing import Optional, Union from jose import JWTError, jwt from fastapi import HTTPException, status from sqlalchemy.orm import Session from app.models import User import logging # Configure logging logger = logging.getLogger(__name__) class TokenManager: def __init__(self, secret_key: str, algorithm: str = "HS256"): self.secret_key = secret_key self.algorithm = algorithm self.access_token_expire_minutes = 30 self.refresh_token_expire_days = 30 def create_access_token( self, data: dict, remember: bool = False, expires_delta: Optional[timedelta] = None ) -> str: """ Create a new access token with optional remember functionality """ try: to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: if remember: expire = datetime.utcnow() + timedelta(days=self.refresh_token_expire_days) else: expire = datetime.utcnow() + timedelta(minutes=self.access_token_expire_minutes) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm) logger.info(f"Access token created successfully for user {data.get('sub')}") return encoded_jwt except Exception as e: logger.error(f"Error creating access token: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error creating access token" ) def verify_token(self, token: str) -> dict: """ Verify and decode a JWT token """ try: payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm]) return payload except JWTError as e: logger.error(f"Token verification failed: {str(e)}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) def get_current_user(self, token: str, db: Session) -> User: """ Get the current user from the token """ try: payload = self.verify_token(token) username: str = payload.get("sub") if username is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) user = db.query(User).filter(User.email == username).first() if user is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) if not user.is_active: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user" ) return user except Exception as e: logger.error(f"Error getting current user: {str(e)}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nesirat/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server