Skip to main content
Glama

MCP Server

by hburgoyne
oauth.py6.55 kB
""" OAuth middleware for token validation and scope checking. """ from fastapi import Request, status, HTTPException from fastapi.responses import JSONResponse from sqlalchemy.orm import Session from typing import List, Optional, Set import jwt import uuid from datetime import datetime from app.db.session import get_db from app.utils.oauth import validate_access_token from app.models.token_blacklist import TokenBlacklist from app.utils.logger import logger from app.core.config import settings # Public endpoints that don't require authentication PUBLIC_ENDPOINTS = { "/health", "/docs", "/redoc", "/openapi.json", "/api/oauth/token", "/api/oauth/authorize", "/api/oauth/consent", "/api/users/register", "/api/users/login", "/", "/static" } async def verify_token_middleware(request: Request, call_next): """ Middleware to validate OAuth tokens and check scopes. This middleware: 1. Skips validation for public endpoints 2. Extracts the token from the Authorization header 3. Validates the token and checks if it's blacklisted 4. Adds the user_id and scopes to the request state Args: request: The incoming request call_next: The next middleware or route handler Returns: The response from the next middleware or route handler """ # Skip validation for public endpoints path = request.url.path if any(path.startswith(endpoint) for endpoint in PUBLIC_ENDPOINTS): return await call_next(request) # Get token from Authorization header auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Bearer "): # When no token is provided, always return 401 Unauthorized # This is the standard OAuth 2.0 response for missing authentication return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"error": "unauthorized", "error_description": "Missing or invalid token"} ) # For test endpoints that use the test override header, allow the request to proceed # This is only for testing purposes test_header = request.headers.get("X-Test-Override-Scopes") if test_header == "true": request.state.user_id = "00000000-0000-0000-0000-000000000001" # Test user ID request.state.scopes = [] request.state.token = "" return await call_next(request) token = auth_header.split(" ")[1] # Get database session db = next(get_db()) # Validate token try: token_obj = validate_access_token(db, token) if not token_obj: return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"error": "unauthorized", "error_description": "Token is invalid or expired"} ) # Check if token is blacklisted token_jti = token # In a real implementation, you'd extract a JTI from the token is_blacklisted = TokenBlacklist.is_blacklisted(db, token_jti) logger.info(f"Token blacklist check for {token_jti}: {is_blacklisted}") if is_blacklisted: logger.info(f"Token {token_jti} is blacklisted, returning 401 Unauthorized") return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"error": "unauthorized", "error_description": "Token has been revoked"} ) # Add user_id and scopes to request state request.state.user_id = token_obj.user_id request.state.scopes = token_obj.scope.split() request.state.token = token except Exception as e: logger.error(f"Error validating token: {str(e)}") return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"error": "unauthorized", "error_description": "Error validating token"} ) # Continue processing the request return await call_next(request) def require_scopes(required_scopes: List[str]): """ Dependency to check if a request has the required scopes. Args: required_scopes: List of required scopes Returns: Dependency function that validates scopes """ def check_scopes(request: Request): # Get scopes from request state (set by middleware) user_scopes = getattr(request.state, "scopes", []) # Check if user has all required scopes if not all(scope in user_scopes for scope in required_scopes): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail={ "error": "insufficient_scope", "error_description": f"Required scopes: {', '.join(required_scopes)}" } ) # Return True to indicate the check passed return True return check_scopes def revoke_token(db: Session, token: str, reason: Optional[str] = None): """ Revoke an OAuth token by adding it to the blacklist. Args: db: Database session token: Token to revoke reason: Optional reason for revocation Returns: True if token was revoked, False otherwise """ try: # Validate token to get expiration time logger.info(f"Revoking token: {token}") token_obj = validate_access_token(db, token) if not token_obj: logger.error(f"Failed to validate token for revocation: {token}") return False # Check if token is already blacklisted existing = db.query(TokenBlacklist).filter(TokenBlacklist.token_jti == token).first() if existing: logger.info(f"Token already blacklisted: {token}") return True # Add token to blacklist blacklist_entry = TokenBlacklist( id=uuid.uuid4(), # Add explicit UUID token_jti=token, blacklisted_at=datetime.utcnow(), reason=reason, expires_at=datetime.fromisoformat(token_obj.access_token_expires_at), created_at=datetime.utcnow(), updated_at=datetime.utcnow() ) db.add(blacklist_entry) db.commit() logger.info(f"Token successfully blacklisted: {token}") return True except Exception as e: logger.error(f"Error revoking token: {str(e)}") return False

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hburgoyne/picard_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server