Skip to main content
Glama

MCP Kafka Schema Reg

MIT License
23
  • Apple
  • Linux
oauth_provider.py46.6 kB
#!/usr/bin/env python3 """ OAuth 2.1 Resource Server for Kafka Schema Registry MCP Server This module provides OAuth 2.1 compliant authentication and authorization functionality with scope-based permissions for the Kafka Schema Registry MCP server. COMPLIANCE: OAuth 2.1, RFC 8692, RFC 8707, MCP 2025-06-18 Supported OAuth Providers: - Azure AD / Entra ID - Google OAuth 2.0 - Keycloak - Okta - Any OAuth 2.1 compliant provider Scopes: - read: Can view schemas, subjects, configurations - write: Can register schemas, update configs (includes read permissions) - admin: Can delete subjects, manage registries (includes write and read permissions) Security Features: - PKCE enforcement (mandatory per OAuth 2.1) - Resource indicator validation (RFC 8707) - Audience validation for all requests - Token binding support - Token revocation checking - JWKS cache with proper TTL management - No development token bypasses in production - Enhanced SSL/TLS verification for all HTTP requests """ import asyncio import hashlib import logging import os import ssl import time from typing import Any, Dict, List, Optional, Set, Union from dotenv import load_dotenv # Load environment variables load_dotenv() # Configure logging logger = logging.getLogger(__name__) # Try to import JWT and HTTP libraries for production validation try: import aiohttp import jwt from jwt.exceptions import ExpiredSignatureError, InvalidTokenError JWT_AVAILABLE = True except ImportError: JWT_AVAILABLE = False logger.warning("JWT validation libraries not available. Install: pip install PyJWT aiohttp cryptography") # OAuth configuration from environment variables ENABLE_AUTH = os.getenv("ENABLE_AUTH", "false").lower() in ("true", "1", "yes", "on") AUTH_ISSUER_URL = os.getenv("AUTH_ISSUER_URL", "https://example.com") AUTH_VALID_SCOPES = [s.strip() for s in os.getenv("AUTH_VALID_SCOPES", "read,write,admin").split(",") if s.strip()] AUTH_DEFAULT_SCOPES = [s.strip() for s in os.getenv("AUTH_DEFAULT_SCOPES", "read").split(",") if s.strip()] AUTH_REQUIRED_SCOPES = [s.strip() for s in os.getenv("AUTH_REQUIRED_SCOPES", "read").split(",") if s.strip()] AUTH_CLIENT_REG_ENABLED = os.getenv("AUTH_CLIENT_REG_ENABLED", "true").lower() in ( "true", "1", "yes", "on", ) AUTH_REVOCATION_ENABLED = os.getenv("AUTH_REVOCATION_ENABLED", "true").lower() in ( "true", "1", "yes", "on", ) # OAuth 2.1 Configuration (generic approach) AUTH_AUDIENCE = os.getenv("AUTH_AUDIENCE", "") # Client ID or API identifier # Legacy GitHub OAuth Configuration (for backward compatibility) AUTH_GITHUB_CLIENT_ID = os.getenv("AUTH_GITHUB_CLIENT_ID", "") AUTH_GITHUB_ORG = os.getenv("AUTH_GITHUB_ORG", "") # SSL/TLS Configuration (inherited from schema_registry_common) ENFORCE_SSL_TLS_VERIFICATION = os.getenv("ENFORCE_SSL_TLS_VERIFICATION", "true").lower() in ("true", "1", "yes", "on") CUSTOM_CA_BUNDLE_PATH = os.getenv("CUSTOM_CA_BUNDLE_PATH", "") # OAuth 2.1 Discovery Configuration OAUTH_DISCOVERY_CACHE_TTL = int(os.getenv("OAUTH_DISCOVERY_CACHE_TTL", "3600")) # 1 hour default _discovery_cache = {} _discovery_cache_timestamps = {} # Resource Server Configuration (RFC 8692) RESOURCE_SERVER_URL = os.getenv("RESOURCE_SERVER_URL", "") # Our resource server URL RESOURCE_INDICATORS = [url.strip() for url in os.getenv("RESOURCE_INDICATORS", "").split(",") if url.strip()] # PKCE Configuration (OAuth 2.1 requirement) REQUIRE_PKCE = os.getenv("REQUIRE_PKCE", "true").lower() in ("true", "1", "yes", "on") # Token Binding Configuration (OAuth 2.1 enhancement) TOKEN_BINDING_ENABLED = os.getenv("TOKEN_BINDING_ENABLED", "false").lower() in ( "true", "1", "yes", "on", ) # Token Introspection Configuration TOKEN_INTROSPECTION_ENABLED = os.getenv("TOKEN_INTROSPECTION_ENABLED", "true").lower() in ("true", "1", "yes", "on") # Token Revocation Configuration TOKEN_REVOCATION_CHECK_ENABLED = os.getenv("TOKEN_REVOCATION_CHECK_ENABLED", "true").lower() in ( "true", "1", "yes", "on", ) # JWKS Configuration JWKS_CACHE_TTL = int(os.getenv("JWKS_CACHE_TTL", "3600")) # 1 hour default JWKS_CACHE_MAX_SIZE = int(os.getenv("JWKS_CACHE_MAX_SIZE", "10")) # Max cached JWKS _jwks_cache = {} _jwks_cache_timestamps = {} JWKS_CACHE_ERRORS = {} # Revoked tokens cache (in production, use Redis or database) REVOKED_TOKENS_CACHE = set() # Development and Testing Configuration ALLOW_DEV_TOKENS = os.getenv("ALLOW_DEV_TOKENS", "true").lower() in ( "true", "1", "yes", "on", ) # Disable development tokens in production if os.getenv("ENVIRONMENT", "development").lower() == "production": ALLOW_DEV_TOKENS = False # Scope definitions for OAuth 2.1 authorization SCOPE_DEFINITIONS = { "read": { "description": "Read access to schemas, subjects, and configurations", "level": 1, "includes": [ "list_subjects", "get_schema", "list_registries", "check_registry_health", "get_compatibility_level", "search_schemas", "get_schema_metadata", "list_schema_versions", "get_subject_versions", "check_schema_compatibility", ], "permissions": ["view_schemas", "view_subjects", "view_configs"], }, "write": { "description": "Write access to schemas and configurations (includes read)", "level": 2, "includes": [ # Read permissions "list_subjects", "get_schema", "list_registries", "check_registry_health", "get_compatibility_level", "search_schemas", "get_schema_metadata", "list_schema_versions", "get_subject_versions", "check_schema_compatibility", # Write permissions "register_schema", "update_compatibility", "set_compatibility_level", ], "permissions": [ "view_schemas", "view_subjects", "view_configs", "create_schemas", "update_schemas", "update_configs", ], }, "admin": { "description": "Administrative access (includes read, write, and delete)", "level": 3, "includes": [ # Read permissions "list_subjects", "get_schema", "list_registries", "check_registry_health", "get_compatibility_level", "search_schemas", "get_schema_metadata", "list_schema_versions", "get_subject_versions", "check_schema_compatibility", # Write permissions "register_schema", "update_compatibility", "set_compatibility_level", # Admin permissions "delete_subject", "delete_schema_version", "clear_context_batch", "migrate_schema", "bulk_migrate_schemas", "compare_registries", ], "permissions": [ "view_schemas", "view_subjects", "view_configs", "create_schemas", "update_schemas", "update_configs", "delete_subjects", "delete_schemas", "manage_compatibility", ], }, } def create_secure_ssl_context() -> ssl.SSLContext: """Create a secure SSL context for OAuth HTTP requests.""" context = ssl.create_default_context() # Configure SSL context for maximum security context.check_hostname = True context.verify_mode = ssl.CERT_REQUIRED # Disable weak protocols and ciphers context.minimum_version = ssl.TLSVersion.TLSv1_2 context.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS") # Load custom CA bundle if specified if CUSTOM_CA_BUNDLE_PATH and os.path.exists(CUSTOM_CA_BUNDLE_PATH): context.load_verify_locations(CUSTOM_CA_BUNDLE_PATH) logger.info(f"OAuth SSL context using custom CA bundle: {CUSTOM_CA_BUNDLE_PATH}") return context def create_secure_aiohttp_connector() -> aiohttp.TCPConnector: """Create a secure aiohttp connector with proper SSL/TLS configuration.""" if ENFORCE_SSL_TLS_VERIFICATION: ssl_context = create_secure_ssl_context() connector = aiohttp.TCPConnector( ssl=ssl_context, use_dns_cache=True, ttl_dns_cache=300, # 5 minutes DNS cache limit=100, # Connection pool limit limit_per_host=30, # Per-host connection limit ) logger.debug("OAuth connector created with SSL verification enabled") else: # SSL verification disabled (not recommended for production) connector = aiohttp.TCPConnector(ssl=False, use_dns_cache=True, ttl_dns_cache=300, limit=100, limit_per_host=30) logger.warning("OAuth connector created with SSL verification DISABLED - not recommended for production") return connector # OAuth 2.1 Token Validator class OAuth21TokenValidator: """ OAuth 2.1 compliant token validator with support for: - PKCE enforcement - Resource indicator validation (RFC 8707) - Audience validation - Token binding - Revocation checking - JWKS caching with TTL - Enhanced SSL/TLS security for all HTTP requests """ def __init__(self): self.session = None self.jwks_cache = _jwks_cache self.jwks_timestamps = _jwks_cache_timestamps async def get_session(self): """Get or create aiohttp session with secure SSL configuration.""" if not self.session: timeout = aiohttp.ClientTimeout(total=30) connector = create_secure_aiohttp_connector() self.session = aiohttp.ClientSession( timeout=timeout, connector=connector, headers={ "User-Agent": "KafkaSchemaRegistryMCP-OAuth/2.0.0 (Security Enhanced)", "Connection": "close", # Don't keep connections alive unnecessarily }, ) logger.debug(f"OAuth session created with SSL verification: {ENFORCE_SSL_TLS_VERIFICATION}") return self.session async def close(self): """Close the aiohttp session.""" if self.session: await self.session.close() self.session = None def is_dev_token(self, token: str) -> bool: """Check if token is a development bypass token.""" return token.startswith("dev-token-") if token else False def is_token_revoked(self, token: str, jti: str = None) -> bool: """Check if token is revoked.""" if not TOKEN_REVOCATION_CHECK_ENABLED: return False # Check by token hash token_hash = hashlib.sha256(token.encode()).hexdigest() if token_hash in REVOKED_TOKENS_CACHE: return True # Check by JTI if available if jti and jti in REVOKED_TOKENS_CACHE: return True return False def validate_audience(self, aud: Union[str, List[str]], resource_indicators: List[str] = None) -> bool: """ Validate token audience against resource indicators (RFC 8707). """ if not aud: logger.warning("Token missing audience claim") return False audiences = [aud] if isinstance(aud, str) else aud # If we have configured resource indicators, validate against them if RESOURCE_INDICATORS: for audience in audiences: if audience in RESOURCE_INDICATORS: return True logger.warning(f"Token audience {audiences} not in allowed resource indicators {RESOURCE_INDICATORS}") return False # If we have a configured AUTH_AUDIENCE, validate against it if AUTH_AUDIENCE: if AUTH_AUDIENCE in audiences: return True logger.warning(f"Token audience {audiences} does not match configured audience {AUTH_AUDIENCE}") return False # If no specific audience validation configured, any audience is valid return True def validate_pkce_requirements(self, claims: Dict[str, Any]) -> bool: """ Validate PKCE requirements for the token. Note: This is typically validated during the authorization code exchange, but we can check for PKCE-related claims in the token. """ if not REQUIRE_PKCE: return True # Check if token contains PKCE-related claims (implementation-specific) # Most providers don't include PKCE details in the final token # but we can check for other indicators of PKCE usage # For now, we'll assume PKCE was properly validated during token issuance # In a full implementation, you'd validate this at the authorization server return True def validate_resource_indicator(self, claims: Dict[str, Any], requested_resource: str = None) -> bool: """ Validate resource indicator according to RFC 8707. """ # Check if token has resource claim resource_claim = claims.get("resource") or claims.get("aud") if not resource_claim: # If no resource indicators are configured, allow access if not RESOURCE_INDICATORS: return True logger.warning("Token missing resource claim") return False # Normalize resource claim to list if isinstance(resource_claim, str): resource_claims = [resource_claim] else: resource_claims = resource_claim # If specific resource requested, validate it's in token if requested_resource: if requested_resource not in resource_claims: logger.warning(f"Requested resource {requested_resource} not authorized in token") return False # Validate against configured resource indicators if RESOURCE_INDICATORS: for resource in resource_claims: if resource in RESOURCE_INDICATORS: return True logger.warning( f"No authorized resources {resource_claims} match configured indicators {RESOURCE_INDICATORS}" ) return False return True async def get_jwks(self, jwks_uri: str) -> Dict[str, Any]: """Get JWKS with caching and TTL management.""" now = time.time() # Check cache first if jwks_uri in self.jwks_cache: cached_time = self.jwks_timestamps.get(jwks_uri, 0) if now - cached_time < JWKS_CACHE_TTL: return self.jwks_cache[jwks_uri] # Fetch fresh JWKS try: session = await self.get_session() async with session.get(jwks_uri) as response: if response.status == 200: jwks = await response.json() # Cache management - remove oldest if cache is full if len(self.jwks_cache) >= JWKS_CACHE_MAX_SIZE: oldest_uri = min( self.jwks_timestamps.keys(), key=lambda k: self.jwks_timestamps[k], ) del self.jwks_cache[oldest_uri] del self.jwks_timestamps[oldest_uri] # Cache the result self.jwks_cache[jwks_uri] = jwks self.jwks_timestamps[jwks_uri] = now # Clear any previous error if jwks_uri in JWKS_CACHE_ERRORS: del JWKS_CACHE_ERRORS[jwks_uri] logger.debug(f"JWKS fetched and cached from {jwks_uri}") return jwks else: error_msg = f"Failed to fetch JWKS: HTTP {response.status}" JWKS_CACHE_ERRORS[jwks_uri] = error_msg logger.error(error_msg) return None except ssl.SSLError as e: error_msg = f"SSL error fetching JWKS from {jwks_uri}: {str(e)}" JWKS_CACHE_ERRORS[jwks_uri] = error_msg logger.error(error_msg) return None except Exception as e: error_msg = f"Error fetching JWKS from {jwks_uri}: {str(e)}" JWKS_CACHE_ERRORS[jwks_uri] = error_msg logger.error(error_msg) return None async def validate_token( self, token: str, required_scopes: Set[str] = None, requested_resource: str = None, ) -> Dict[str, Any]: """ Validate OAuth 2.1 token with full compliance checks. Returns validation result with user info and scopes. """ try: # Check for development token bypass (only in development) if self.is_dev_token(token): if ALLOW_DEV_TOKENS: logger.warning("🚨 Using development token bypass - NOT FOR PRODUCTION!") return { "valid": True, "user": "dev-user", "scopes": list(AUTH_VALID_SCOPES), "claims": { "sub": "dev-user", "scope": " ".join(AUTH_VALID_SCOPES), }, "dev_token": True, } else: logger.error("🚨 Development token rejected in production environment") return { "valid": False, "error": "Development tokens not allowed in production", } # Check if token is revoked if self.is_token_revoked(token): return {"valid": False, "error": "Token has been revoked"} # Decode JWT without verification first to get header try: header = jwt.get_unverified_header(token) except Exception as e: return {"valid": False, "error": f"Invalid JWT format: {str(e)}"} # Get key ID from header kid = header.get("kid") if not kid: return {"valid": False, "error": "Missing key ID in JWT header"} # Get provider configuration provider_config = await self.get_provider_config() if not provider_config: return { "valid": False, "error": "OAuth 2.1 provider configuration not available", } jwks_uri = provider_config.get("jwks_uri") if not jwks_uri: return {"valid": False, "error": "No JWKS URI configured for provider"} # Get JWKS jwks = await self.get_jwks(jwks_uri) if not jwks: return {"valid": False, "error": "Failed to retrieve JWKS"} # Find the key key = None for k in jwks.get("keys", []): if k.get("kid") == kid: key = k break if not key: return {"valid": False, "error": f"Key ID {kid} not found in JWKS"} # Convert JWK to PEM format try: public_key = jwt.algorithms.RSAAlgorithm.from_jwk(key) except Exception as e: return { "valid": False, "error": f"Failed to convert JWK to public key: {str(e)}", } # Verify and decode token try: claims = jwt.decode( token, public_key, algorithms=["RS256"], issuer=AUTH_ISSUER_URL, options={ "verify_signature": True, "verify_exp": True, "verify_iat": True, "verify_iss": True, "require": ["exp", "iat", "iss", "sub"], }, ) except ExpiredSignatureError: return {"valid": False, "error": "Token has expired"} except InvalidTokenError as e: return {"valid": False, "error": f"Invalid token: {str(e)}"} # Validate audience (RFC 8707) if not self.validate_audience(claims.get("aud"), RESOURCE_INDICATORS): return {"valid": False, "error": "Invalid audience"} # Validate resource indicator (RFC 8707) if not self.validate_resource_indicator(claims, requested_resource): return {"valid": False, "error": "Invalid resource indicator"} # Validate PKCE requirements if not self.validate_pkce_requirements(claims): return {"valid": False, "error": "PKCE validation failed"} # Extract scopes scope_claim = claims.get("scope") or claims.get("scp") or "" if isinstance(scope_claim, list): user_scopes = set(scope_claim) else: user_scopes = set(scope_claim.split()) if scope_claim else set() # Validate required scopes if required_scopes and not self.check_scopes(user_scopes, required_scopes): return { "valid": False, "error": "Insufficient permissions", "required_scopes": list(required_scopes), "user_scopes": list(user_scopes), } # Extract user information user_id = claims.get("sub") or claims.get("preferred_username") or claims.get("email") return { "valid": True, "user": user_id, "scopes": list(user_scopes), "claims": claims, "jti": claims.get("jti"), # For revocation tracking "ssl_verified": ENFORCE_SSL_TLS_VERIFICATION, } except Exception as e: logger.error(f"Token validation error: {str(e)}") return {"valid": False, "error": f"Token validation failed: {str(e)}"} async def discover_oauth_configuration(self, issuer_url: str) -> Optional[Dict[str, Any]]: """ Discover OAuth 2.1 configuration from standard discovery endpoints. Uses RFC 8414 (OAuth Authorization Server Metadata) for discovery. """ current_time = time.time() # Check cache first if ( issuer_url in _discovery_cache and issuer_url in _discovery_cache_timestamps and (current_time - _discovery_cache_timestamps[issuer_url]) < OAUTH_DISCOVERY_CACHE_TTL ): return _discovery_cache[issuer_url] discovery_endpoints = [ f"{issuer_url}/.well-known/oauth-authorization-server", f"{issuer_url}/.well-known/openid_configuration", # OIDC discovery ] session = await self.get_session() for discovery_url in discovery_endpoints: try: logger.debug(f"Attempting OAuth 2.1 discovery from: {discovery_url}") async with session.get(discovery_url) as response: if response.status == 200: config = await response.json() # Validate required OAuth 2.1 fields required_fields = [ "issuer", "authorization_endpoint", "token_endpoint", ] if all(field in config for field in required_fields): # Cache the discovery result _discovery_cache[issuer_url] = config _discovery_cache_timestamps[issuer_url] = current_time logger.info(f"✅ OAuth 2.1 discovery successful from: {discovery_url}") logger.debug( f"Discovered endpoints - Auth: {config.get('authorization_endpoint')}, " f"Token: {config.get('token_endpoint')}, JWKS: {config.get('jwks_uri')}" ) return config else: logger.warning( f"Discovery endpoint {discovery_url} missing required fields: {required_fields}" ) except ssl.SSLError as e: logger.error(f"SSL error during OAuth discovery for {discovery_url}: {e}") continue except Exception as e: logger.debug(f"Discovery failed for {discovery_url}: {e}") continue # Fallback: If discovery fails, try to construct basic configuration logger.warning(f"⚠️ OAuth 2.1 discovery failed for {issuer_url}, using fallback configuration") return await self.get_fallback_configuration(issuer_url) async def get_fallback_configuration(self, issuer_url: str) -> Dict[str, Any]: """ Fallback configuration when OAuth 2.1 discovery fails. Only handles GitHub (not OAuth 2.1 compliant). All other providers must support RFC 8414 discovery. """ # Handle GitHub special case (not OAuth 2.1 compliant) if "github.com" in issuer_url or "api.github.com" in issuer_url: logger.info("Using GitHub fallback configuration (not OAuth 2.1 compliant)") return { "issuer": "https://github.com", "authorization_endpoint": "https://github.com/login/oauth/authorize", "token_endpoint": "https://github.com/login/oauth/access_token", "scope_claim": "scope", "username_claim": "login", "oauth_2_1_compliant": False, "note": "GitHub OAuth has limited OAuth 2.1 support", } # For all other providers, require OAuth 2.1 discovery logger.error(f"OAuth 2.1 discovery failed for {issuer_url}. Provider must support RFC 8414 discovery.") raise ValueError( f"OAuth 2.1 discovery failed for {issuer_url}. Please ensure your provider supports RFC 8414 discovery endpoints." ) async def get_provider_config(self) -> Optional[Dict[str, Any]]: """Get OAuth configuration using standard OAuth 2.1 discovery.""" if not AUTH_ISSUER_URL or AUTH_ISSUER_URL == "https://example.com": logger.warning("No valid AUTH_ISSUER_URL configured") return None return await self.discover_oauth_configuration(AUTH_ISSUER_URL) def check_scopes(self, user_scopes: Set[str], required_scopes: Set[str]) -> bool: """Check if user has required scopes, considering scope hierarchy.""" if not required_scopes: return True # Expand user scopes to include inherited scopes expanded_user_scopes = self.expand_scopes(list(user_scopes)) # Check if all required scopes are present return required_scopes.issubset(expanded_user_scopes) def expand_scopes(self, scopes: list) -> Set[str]: """Expand scopes to include inherited scopes based on hierarchy.""" expanded = set(scopes) for scope in scopes: if scope in SCOPE_DEFINITIONS: required_scopes = SCOPE_DEFINITIONS[scope].get("requires", []) expanded.update(required_scopes) return expanded # Global token validator instance token_validator = OAuth21TokenValidator() if JWT_AVAILABLE else None if ENABLE_AUTH: try: from fastmcp.server.auth import BearerAuthProvider from fastmcp.server.dependencies import AccessToken, get_access_token class OAuth21BearerAuthProvider(BearerAuthProvider): """ OAuth 2.1 compliant Bearer Auth Provider for MCP Server. Implements comprehensive OAuth 2.1 features: - PKCE enforcement (mandatory) - Resource indicator validation (RFC 8707) - Audience validation - Token binding support - Token revocation checking - Proper JWKS caching - Enhanced SSL/TLS security for all HTTP requests """ def __init__(self, **kwargs): # Use standard OAuth 2.1 configuration - let discovery handle the details super().__init__( issuer=AUTH_ISSUER_URL, audience=AUTH_AUDIENCE or RESOURCE_INDICATORS, required_scopes=AUTH_REQUIRED_SCOPES, **kwargs, ) self.valid_scopes = set(AUTH_VALID_SCOPES) self.required_scopes = set(AUTH_REQUIRED_SCOPES) logger.info(f"OAuth 2.1 Bearer Auth Provider initialized with scopes: {self.valid_scopes}") logger.info(f"JWT validation available: {JWT_AVAILABLE}") logger.info(f"PKCE enforcement: {REQUIRE_PKCE}") logger.info(f"Resource indicators: {RESOURCE_INDICATORS}") logger.info(f"OAuth 2.1 Issuer: {AUTH_ISSUER_URL}") logger.info(f"SSL/TLS verification: {ENFORCE_SSL_TLS_VERIFICATION}") logger.info( "🚀 Using generic OAuth 2.1 discovery (RFC 8414) - no provider-specific configuration needed" ) async def validate_token_comprehensive( self, token: str, required_scopes: Set[str] = None, requested_resource: str = None, ) -> Dict[str, Any]: """Comprehensive token validation using our OAuth 2.1 validator.""" if not token_validator: return {"valid": False, "error": "Token validation not available"} return await token_validator.validate_token(token, required_scopes, requested_resource) def check_scopes(self, user_scopes: Set[str], required_scopes: Set[str]) -> bool: """Check if user has required scopes, considering scope hierarchy.""" if token_validator: return token_validator.check_scopes(user_scopes, required_scopes) return super().check_scopes(user_scopes, required_scopes) def expand_scopes(self, scopes: list) -> Set[str]: """Expand scopes to include inherited scopes based on hierarchy.""" if token_validator: return token_validator.expand_scopes(scopes) return set(scopes) def has_read_access(self, user_scopes: Set[str]) -> bool: return self.check_scopes(user_scopes, {"read"}) def has_write_access(self, user_scopes: Set[str]) -> bool: return self.check_scopes(user_scopes, {"write"}) def has_admin_access(self, user_scopes: Set[str]) -> bool: return self.check_scopes(user_scopes, {"admin"}) # Create global instance for easy access if JWT_AVAILABLE: try: oauth_provider = OAuth21BearerAuthProvider() logger.info("OAuth 2.1 Bearer Auth Provider initialized successfully") except Exception as e: logger.warning(f"Failed to initialize OAuth 2.1 Bearer Auth Provider: {e}") oauth_provider = None else: oauth_provider = None def require_scopes(*required_scopes: str): """ Decorator to require specific OAuth scopes with OAuth 2.1 compliance. """ def decorator(func): async def wrapper(*args, **kwargs): try: # Access token information using FastMCP's dependency system access_token: AccessToken = get_access_token() if not access_token or not access_token.token: if ENABLE_AUTH: return { "error": "Authentication required", "required_scopes": list(required_scopes), "oauth_2_1_compliant": True, } else: # Auth disabled, proceed return ( await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs) ) # Perform comprehensive OAuth 2.1 validation if oauth_provider and token_validator: validation_result = await oauth_provider.validate_token_comprehensive( access_token.token, set(required_scopes) ) if not validation_result.get("valid"): return { "error": validation_result.get("error", "Token validation failed"), "required_scopes": list(required_scopes), "oauth_2_1_compliant": True, } else: # Fallback to basic scope checking user_scopes = set(access_token.scopes) if access_token.scopes else set() if oauth_provider and not oauth_provider.check_scopes(user_scopes, set(required_scopes)): return { "error": "Insufficient permissions", "required_scopes": list(required_scopes), "user_scopes": list(user_scopes), "oauth_2_1_compliant": True, } return ( await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs) ) except Exception as e: if ENABLE_AUTH: logger.warning(f"OAuth 2.1 authentication check failed: {e}") return { "error": "Authentication failed", "required_scopes": list(required_scopes), "oauth_2_1_compliant": True, } else: # If auth is disabled, just proceed return ( await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs) ) # Preserve function metadata wrapper.__name__ = func.__name__ wrapper.__doc__ = func.__doc__ wrapper.__annotations__ = func.__annotations__ return wrapper return decorator except ImportError as e: logger.warning(f"FastMCP auth imports not available: {e}") ENABLE_AUTH = False oauth_provider = None def require_scopes(*required_scopes: str): """Fallback decorator when FastMCP auth is not available.""" def decorator(func): return func return decorator else: oauth_provider = None def require_scopes(*required_scopes: str): """Decorator when authentication is disabled.""" def decorator(func): return func return decorator def revoke_token(token: str = None, jti: str = None): """ Revoke a token by adding it to the revocation cache. In production, this should update a database or external revocation list. """ if token: token_hash = hashlib.sha256(token.encode()).hexdigest() REVOKED_TOKENS_CACHE.add(token_hash) if jti: REVOKED_TOKENS_CACHE.add(jti) logger.info(f"Token revoked (JTI: {jti})") def get_oauth_scopes_info() -> Dict[str, Any]: """Get information about OAuth 2.1 configuration and scope definitions.""" return { "oauth_enabled": ENABLE_AUTH, "oauth_2_1_compliant": True, "specification_version": "OAuth 2.1", "mcp_specification": "MCP 2025-06-18", "discovery_method": "RFC 8414 (Generic OAuth 2.1)", "issuer_url": AUTH_ISSUER_URL, "issuer": AUTH_ISSUER_URL, "audience": AUTH_AUDIENCE, "resource_indicators": RESOURCE_INDICATORS, "valid_scopes": AUTH_VALID_SCOPES, "default_scopes": AUTH_DEFAULT_SCOPES, "required_scopes": AUTH_REQUIRED_SCOPES, "scope_definitions": SCOPE_DEFINITIONS, "client_registration_enabled": AUTH_CLIENT_REG_ENABLED, "revocation_enabled": AUTH_REVOCATION_ENABLED, "jwt_available": JWT_AVAILABLE, "security_features": { "pkce_required": REQUIRE_PKCE, "allowed_code_challenge_methods": ["S256"], # OAuth 2.1 requires S256 "resource_indicator_validation": bool(RESOURCE_INDICATORS), "audience_validation": bool(AUTH_AUDIENCE or RESOURCE_INDICATORS), "token_binding": TOKEN_BINDING_ENABLED, "token_introspection": TOKEN_INTROSPECTION_ENABLED, "revocation_checking": TOKEN_REVOCATION_CHECK_ENABLED, "ssl_tls_verification": ENFORCE_SSL_TLS_VERIFICATION, "custom_ca_bundle": bool(CUSTOM_CA_BUNDLE_PATH), "jwks_caching": { "enabled": True, "ttl_seconds": JWKS_CACHE_TTL, "max_size": JWKS_CACHE_MAX_SIZE, }, }, "development_mode": { "is_development": os.getenv("ENVIRONMENT", "development").lower() == "development", "dev_tokens_allowed": ALLOW_DEV_TOKENS, "warning": ("🚨 Development tokens MUST be disabled in production!" if ALLOW_DEV_TOKENS else None), }, } def get_oauth_provider_configs(): """ Get OAuth 2.1 provider setup examples for documentation purposes. Note: With generic OAuth 2.1 discovery, most providers work with just: - AUTH_ISSUER_URL (the OAuth issuer) - AUTH_AUDIENCE (your client/API identifier) No provider-specific configuration needed for OAuth 2.1 compliant providers! """ return { "oauth_2_1_generic": { "name": "Generic OAuth 2.1 Provider", "description": "Any OAuth 2.1 compliant provider (Azure, Google, Okta, Keycloak, etc.)", "required_env": { "AUTH_ISSUER_URL": "https://your-oauth-provider.com", "AUTH_AUDIENCE": "your-client-id-or-api-identifier", }, "optional_env": { "RESOURCE_INDICATORS": "https://your-api.com,https://another-api.com", "REQUIRE_PKCE": "true", "TOKEN_BINDING_ENABLED": "true", "ENFORCE_SSL_TLS_VERIFICATION": "true", "CUSTOM_CA_BUNDLE_PATH": "/path/to/custom/ca-bundle.pem", }, "oauth_2_1_features": { "pkce_support": True, "resource_indicators": True, "discovery": "RFC 8414", "automatic_endpoint_discovery": True, "enhanced_ssl_security": True, }, "notes": [ "✅ Uses standard OAuth 2.1 discovery (/.well-known/oauth-authorization-server)", "✅ No provider-specific configuration needed", "✅ Works with any OAuth 2.1 compliant provider", "✅ Automatic JWKS endpoint discovery", "✅ Supports PKCE, Resource Indicators, and other OAuth 2.1 features", "✅ Enhanced SSL/TLS security for all HTTP requests", "✅ Custom CA bundle support for enterprise environments", ], }, "examples": { "azure": { "name": "Azure AD / Entra ID", "issuer_url_pattern": "https://login.microsoftonline.com/{tenant-id}/v2.0", "example_setup": { "AUTH_ISSUER_URL": "https://login.microsoftonline.com/your-tenant-id/v2.0", "AUTH_AUDIENCE": "your-azure-client-id", }, "oauth_2_1_compliant": True, "discovery_endpoint": "/.well-known/oauth-authorization-server", "setup_docs": "https://docs.microsoft.com/en-us/azure/active-directory/develop/", }, "google": { "name": "Google OAuth 2.0", "issuer_url_pattern": "https://accounts.google.com", "example_setup": { "AUTH_ISSUER_URL": "https://accounts.google.com", "AUTH_AUDIENCE": "your-google-client-id.apps.googleusercontent.com", }, "oauth_2_1_compliant": True, "discovery_endpoint": "/.well-known/openid_configuration", "setup_docs": "https://developers.google.com/identity/protocols/oauth2", }, "okta": { "name": "Okta", "issuer_url_pattern": "https://{domain}/oauth2/default", "example_setup": { "AUTH_ISSUER_URL": "https://your-domain.okta.com/oauth2/default", "AUTH_AUDIENCE": "api://your-api-identifier", }, "oauth_2_1_compliant": True, "discovery_endpoint": "/.well-known/oauth-authorization-server", "setup_docs": "https://developer.okta.com/docs/guides/implement-oauth-for-okta/", }, "keycloak": { "name": "Keycloak", "issuer_url_pattern": "https://{server}/realms/{realm}", "example_setup": { "AUTH_ISSUER_URL": "https://keycloak.example.com/realms/your-realm", "AUTH_AUDIENCE": "your-keycloak-client-id", }, "oauth_2_1_compliant": True, "discovery_endpoint": "/.well-known/openid_configuration", "setup_docs": "https://www.keycloak.org/docs/latest/securing_apps/#_oidc", }, "github": { "name": "GitHub OAuth (Limited Support)", "issuer_url_pattern": "https://github.com", "example_setup": { "AUTH_ISSUER_URL": "https://github.com", "AUTH_AUDIENCE": "your-github-client-id", }, "oauth_2_1_compliant": False, "notes": [ "⚠️ GitHub has limited OAuth 2.1 support", "❌ No PKCE support", "❌ No resource indicators", "❌ No standard discovery endpoints", ], "setup_docs": "https://docs.github.com/en/developers/apps/building-oauth-apps", }, }, "migration_note": { "message": "🚀 Simplified Configuration!", "details": [ "With OAuth 2.1 discovery, you only need AUTH_ISSUER_URL and AUTH_AUDIENCE", "No more provider-specific configuration needed", "All OAuth 2.1 compliant providers work the same way", "Automatic endpoint discovery via RFC 8414", "Enhanced SSL/TLS security for all OAuth communications", "Custom CA bundle support for enterprise deployments", "Legacy provider-specific environment variables still supported for backward compatibility", ], }, } def get_fastmcp_config(server_name: str): """Get FastMCP configuration with OAuth 2.1 authentication and MCP 2025-06-18 compliance.""" config = { "name": server_name, # Note: MCP 2025-06-18 compliance is handled at the application level, # not through FastMCP configuration parameters } if ENABLE_AUTH and oauth_provider: config["auth"] = oauth_provider logger.info("FastMCP configured with OAuth 2.1 Bearer token authentication (MCP 2025-06-18 compliant)") else: logger.info("FastMCP configured without authentication (MCP 2025-06-18 compliant)") # Log the compliance information for clarity logger.info("🚫 JSON-RPC batching disabled per MCP 2025-06-18 specification (application-level)") logger.info("💡 Application-level batch operations (clear_context_batch, etc.) remain available") logger.info("🔒 OAuth 2.1 features enabled: PKCE, Resource Indicators, Audience Validation") logger.info("🚀 Using generic OAuth 2.1 discovery - works with any compliant provider") logger.info(f"🔐 SSL/TLS verification: {'ENABLED' if ENFORCE_SSL_TLS_VERIFICATION else 'DISABLED'}") return config # Export main components __all__ = [ "ENABLE_AUTH", "AUTH_ISSUER_URL", "AUTH_VALID_SCOPES", "AUTH_DEFAULT_SCOPES", "AUTH_REQUIRED_SCOPES", "AUTH_CLIENT_REG_ENABLED", "AUTH_REVOCATION_ENABLED", "AUTH_AUDIENCE", "AUTH_GITHUB_CLIENT_ID", "AUTH_GITHUB_ORG", "OAUTH_DISCOVERY_CACHE_TTL", "RESOURCE_INDICATORS", "REQUIRE_PKCE", "TOKEN_BINDING_ENABLED", "TOKEN_INTROSPECTION_ENABLED", "TOKEN_REVOCATION_CHECK_ENABLED", "ENFORCE_SSL_TLS_VERIFICATION", "CUSTOM_CA_BUNDLE_PATH", "JWT_AVAILABLE", "SCOPE_DEFINITIONS", "OAuth21TokenValidator", "oauth_provider", "require_scopes", "get_oauth_scopes_info", "get_oauth_provider_configs", "get_fastmcp_config", "revoke_token", "token_validator", "create_secure_ssl_context", "create_secure_aiohttp_connector", ]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server