Skip to main content
Glama
auth_code_store.py•7.49 kB
""" Authorization Code Store for OAuth 2.1 Manages authorization codes and PKCE validation """ import uuid import threading import time import hashlib import base64 from typing import Dict, Optional class AuthorizationCode: """Represents an authorization code with associated data""" def __init__( self, code: str, google_access_token: str, google_id_token: str, user_info: dict, client_id: str, code_challenge: str, code_challenge_method: str, redirect_uri: str, scope: str, expires_at: float ): self.code = code self.google_access_token = google_access_token self.google_id_token = google_id_token self.user_info = user_info self.client_id = client_id self.code_challenge = code_challenge self.code_challenge_method = code_challenge_method self.redirect_uri = redirect_uri self.scope = scope self.expires_at = expires_at self.used = False class AuthCodeStore: """ Thread-safe storage for authorization codes. Implements PKCE validation and code expiration. """ def __init__(self, code_lifetime: int = 600): """ Initialize the authorization code store. Args: code_lifetime: Lifetime of authorization codes in seconds (default: 10 minutes) """ self._codes: Dict[str, AuthorizationCode] = {} self._lock = threading.Lock() self._code_lifetime = code_lifetime def create_code( self, google_access_token: str, google_id_token: str, user_info: dict, client_id: str, code_challenge: str, code_challenge_method: str, redirect_uri: str, scope: str ) -> str: """ Create a new authorization code. Args: google_access_token: Access token from Google google_id_token: ID token from Google user_info: User information from Google client_id: The client ID requesting the code code_challenge: PKCE code challenge code_challenge_method: PKCE method (S256 or plain) redirect_uri: The redirect URI scope: Requested scopes Returns: The generated authorization code """ with self._lock: # Generate unique authorization code code = f"code_{uuid.uuid4().hex}" # Create authorization code object auth_code = AuthorizationCode( code=code, google_access_token=google_access_token, google_id_token=google_id_token, user_info=user_info, client_id=client_id, code_challenge=code_challenge, code_challenge_method=code_challenge_method, redirect_uri=redirect_uri, scope=scope, expires_at=time.time() + self._code_lifetime ) # Store it self._codes[code] = auth_code print(f"[AuthCodeStore] Created authorization code for client {client_id}") return code def exchange_code( self, code: str, code_verifier: str, client_id: str, redirect_uri: str ) -> Optional[AuthorizationCode]: """ Exchange an authorization code for token data. Validates PKCE code_verifier and ensures code hasn't been used. Args: code: The authorization code code_verifier: PKCE code verifier client_id: The client ID redirect_uri: The redirect URI (must match original) Returns: AuthorizationCode if valid, None otherwise """ with self._lock: auth_code = self._codes.get(code) if not auth_code: print(f"[AuthCodeStore] Code not found: {code}") return None # Check if already used if auth_code.used: print(f"[AuthCodeStore] Code already used: {code}") # Security: invalidate all tokens for this client del self._codes[code] return None # Check expiration if time.time() > auth_code.expires_at: print(f"[AuthCodeStore] Code expired: {code}") del self._codes[code] return None # Validate client ID if auth_code.client_id != client_id: print(f"[AuthCodeStore] Client ID mismatch for code: {code}") return None # Validate redirect URI if auth_code.redirect_uri != redirect_uri: print(f"[AuthCodeStore] Redirect URI mismatch for code: {code}") return None # Validate PKCE code_verifier if not self._validate_pkce(code_verifier, auth_code.code_challenge, auth_code.code_challenge_method): print(f"[AuthCodeStore] PKCE validation failed for code: {code}") return None # Mark as used and return auth_code.used = True print(f"[AuthCodeStore] Code exchanged successfully for client {client_id}") return auth_code def _validate_pkce(self, code_verifier: str, code_challenge: str, method: str) -> bool: """ Validate PKCE code_verifier against code_challenge. Args: code_verifier: The code verifier from token request code_challenge: The code challenge from authorization request method: The challenge method (S256 or plain) Returns: True if valid, False otherwise """ if method == "S256": # Hash the verifier with SHA256 and base64url encode verifier_hash = hashlib.sha256(code_verifier.encode('ascii')).digest() computed_challenge = base64.urlsafe_b64encode(verifier_hash).decode('ascii').rstrip('=') return computed_challenge == code_challenge elif method == "plain": # Plain method: verifier must equal challenge return code_verifier == code_challenge else: print(f"[AuthCodeStore] Unknown PKCE method: {method}") return False def cleanup_expired(self) -> int: """ Remove expired authorization codes. Returns: Number of codes removed """ with self._lock: now = time.time() expired = [ code for code, auth_code in self._codes.items() if now > auth_code.expires_at or auth_code.used ] for code in expired: del self._codes[code] if expired: print(f"[AuthCodeStore] Cleaned up {len(expired)} expired/used codes") return len(expired) def count(self) -> int: """Get total number of active authorization codes""" with self._lock: return len(self._codes) # Global authorization code store instance auth_code_store = AuthCodeStore()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GeneralJerel/ChessMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server