Skip to main content
Glama
token_extractor.py4.4 kB
"""Extract and validate OAuth tokens from MCP requests. This module provides token extraction and validation using Hydra introspection. It replaces the custom OAuth token validation with Hydra's standards-compliant OAuth 2.1 token introspection. """ import logging import sys from pathlib import Path from auth.client_cache import get_or_create_client from server.auth.hydra_validator import HydraToken, validate_token # Add project root to path to enable importing from server package PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) # Import client cache for performance optimization logger = logging.getLogger(__name__) def extract_bearer_token(authorization_header: str | None) -> str: """Extract Bearer token from Authorization header. Args: authorization_header: Authorization header value Returns: Bearer token string Raises: ValueError: If header invalid or missing token """ if not authorization_header: raise ValueError("Missing Authorization header") if not authorization_header.startswith("Bearer "): raise ValueError( "Invalid Authorization header format. Expected 'Bearer <token>'" ) token = authorization_header.replace("Bearer ", "", 1).strip() if not token: raise ValueError("Empty Bearer token") return token async def extract_token_context( request, required_scopes: list[str] | None = None ) -> dict: """Extract and validate token from MCP request using Hydra. This function: 1. Extracts Bearer token from Authorization header 2. Validates token via Hydra introspection 3. Extracts project context from token claims 4. Validates required scopes Args: request: FastAPI/MCP request object required_scopes: Optional list of required scopes (e.g., ["memories:read"]) Returns: Dict with user_id, project_id, organization_id, scopes Raises: ValueError: If token invalid or missing required data """ # Get Authorization header auth_header = request.headers.get("authorization") # Extract Bearer token token = extract_bearer_token(auth_header) # Validate with Hydra hydra_token: HydraToken = validate_token(token) # Check required scopes if required_scopes: missing_scopes = [s for s in required_scopes if s not in hydra_token.scopes] if missing_scopes: error_msg = f"Token missing required scopes: {missing_scopes}" logger.warning( f"{error_msg}: token_scopes={hydra_token.scopes}, " f"required={required_scopes}" ) raise ValueError(error_msg) # Extract project context if not hydra_token.project_id: error_msg = "Token missing project context" logger.error( f"{error_msg}: subject={hydra_token.subject}, scopes={hydra_token.scopes}" ) raise ValueError(error_msg) logger.info( f"✅ Token context extracted: user={hydra_token.subject}, " f"project={hydra_token.project_id}, scopes={hydra_token.scopes}" ) return { "user_id": hydra_token.subject, "project_id": hydra_token.project_id, "organization_id": hydra_token.organization_id, "scopes": hydra_token.scopes, } def create_project_client(project_id: str, oauth_token: str, host: str): """Create SelfMemory client configured for specific project using OAuth token. Uses client caching to avoid creating new connections on every request, significantly improving performance by reusing TCP/TLS connections. Args: project_id: Project ID from OAuth token claims oauth_token: OAuth 2.1 access token (validated by middleware) host: SelfMemory API host Returns: Configured SelfMemoryClient instance with OAuth authentication (cached) """ from selfmemory import SelfMemoryClient # Use helper function to eliminate DRY violation def create_client(): client = SelfMemoryClient(oauth_token=oauth_token, host=host) client.project_id = project_id logger.info(f"✅ Created new SelfMemoryClient for project {project_id}") return client return get_or_create_client(oauth_token, create_client)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shrijayan/SelfMemory'

If you have feedback or need assistance with the MCP directory API, please join our Discord server