Skip to main content
Glama

Doris MCP Server

Official
by apache
jwt_manager.pyโ€ข17.2 kB
#!/usr/bin/env python3 # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ JWT Manager Module Provides comprehensive JWT token management including generation, validation, refresh and revocation """ import time import uuid import asyncio from typing import Dict, Any, Optional, Tuple from datetime import datetime, timedelta try: import jwt except ImportError: raise ImportError("PyJWT is required for JWT functionality. Install with: pip install PyJWT[crypto]") from .key_manager import KeyManager from .token_validators import TokenValidator, TokenBlacklist from ..utils.logger import get_logger logger = get_logger(__name__) class JWTManager: """JWT Token Manager Provides comprehensive JWT token lifecycle management, including: - Token generation and signing - Token validation and parsing - Token refresh mechanism - Token revocation and blacklist - Automatic key rotation """ def __init__(self, config): """Initialize JWT manager Args: config: DorisConfig configuration object (with security attribute) """ self.config = config # Access JWT settings through the security configuration if hasattr(config, 'security'): security_config = config.security else: # Fallback if config is passed directly as SecurityConfig security_config = config self.algorithm = security_config.jwt_algorithm self.issuer = security_config.jwt_issuer self.audience = security_config.jwt_audience self.access_token_expiry = security_config.jwt_access_token_expiry self.refresh_token_expiry = security_config.jwt_refresh_token_expiry self.enable_refresh = security_config.enable_token_refresh self.enable_revocation = security_config.enable_token_revocation # Initialize components self.key_manager = KeyManager(config) self.token_blacklist = TokenBlacklist() self.validator = TokenValidator(config, self.token_blacklist) # Automatic key rotation task self._key_rotation_task = None logger.info(f"JWTManager initialized with algorithm: {self.algorithm}") async def initialize(self) -> bool: """Initialize JWT manager""" try: # Initialize key manager if not await self.key_manager.initialize(): logger.error("Failed to initialize key manager") return False # Start token validator await self.validator.start() # Start automatic key rotation if self.key_manager.key_rotation_interval > 0: self._key_rotation_task = asyncio.create_task(self._auto_key_rotation()) logger.info("JWTManager initialization completed") return True except Exception as e: logger.error(f"Failed to initialize JWTManager: {e}") return False async def shutdown(self): """Shutdown JWT manager""" try: # Stop key rotation task if self._key_rotation_task: self._key_rotation_task.cancel() try: await self._key_rotation_task except asyncio.CancelledError: pass # Stop validator await self.validator.stop() logger.info("JWTManager shutdown completed") except Exception as e: logger.error(f"Error during JWTManager shutdown: {e}") async def generate_tokens(self, user_info: Dict[str, Any], custom_claims: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Generate access token and refresh token Args: user_info: User information dictionary, containing user_id, roles, permissions, etc. custom_claims: Custom claims Returns: Dictionary containing access_token and refresh_token """ try: current_time = int(time.time()) jti = str(uuid.uuid4()) # Build base payload base_payload = { 'iss': self.issuer, 'aud': self.audience, 'iat': current_time, 'jti': jti, 'sub': user_info.get('user_id'), 'roles': user_info.get('roles', []), 'permissions': user_info.get('permissions', []), 'security_level': user_info.get('security_level', 'internal') } # Add custom claims if custom_claims: base_payload.update(custom_claims) # Generate access token access_payload = base_payload.copy() access_payload.update({ 'exp': current_time + self.access_token_expiry, 'token_type': 'access' }) access_token = await self._sign_token(access_payload) result = { 'access_token': access_token, 'token_type': 'Bearer', 'expires_in': self.access_token_expiry, 'user_id': user_info.get('user_id'), 'issued_at': current_time } # Generate refresh token (if enabled) if self.enable_refresh: refresh_jti = str(uuid.uuid4()) refresh_payload = { 'iss': self.issuer, 'aud': self.audience, 'iat': current_time, 'exp': current_time + self.refresh_token_expiry, 'jti': refresh_jti, 'sub': user_info.get('user_id'), 'token_type': 'refresh', 'access_jti': jti # Associated access token ID } refresh_token = await self._sign_token(refresh_payload) result.update({ 'refresh_token': refresh_token, 'refresh_expires_in': self.refresh_token_expiry }) logger.info(f"Generated tokens for user: {user_info.get('user_id')}") return result except Exception as e: logger.error(f"Failed to generate tokens: {e}") raise async def _sign_token(self, payload: Dict[str, Any]) -> str: """Sign JWT token Args: payload: JWT payload Returns: Signed JWT token """ try: signing_key = self.key_manager.get_private_key() if self.algorithm == "HS256": # Symmetric key signing token = jwt.encode(payload, signing_key, algorithm=self.algorithm) else: # Asymmetric key signing token = jwt.encode(payload, signing_key, algorithm=self.algorithm) return token except Exception as e: logger.error(f"Failed to sign token: {e}") raise async def validate_token(self, token: str, token_type: str = 'access') -> Dict[str, Any]: """Validate JWT token Args: token: JWT token string token_type: Token type ('access' or 'refresh') Returns: Validation result and user information Raises: ValueError: Token validation failed """ try: # Decode token verification_key = self.key_manager.get_public_key() # Get security configuration if hasattr(self.config, 'security'): security_config = self.config.security else: security_config = self.config # JWT decoding options options = { 'verify_signature': security_config.jwt_verify_signature, 'verify_exp': security_config.jwt_require_exp, 'verify_iat': security_config.jwt_require_iat, 'verify_nbf': security_config.jwt_require_nbf, 'verify_aud': security_config.jwt_verify_audience, 'verify_iss': security_config.jwt_verify_issuer, } # Decode JWT payload = jwt.decode( token, verification_key, algorithms=[self.algorithm], audience=self.audience if security_config.jwt_verify_audience else None, issuer=self.issuer if security_config.jwt_verify_issuer else None, leeway=security_config.jwt_leeway, options=options ) # Check token type if payload.get('token_type') != token_type: raise ValueError(f"Invalid token type: expected {token_type}") # Use validator for additional checks validation_result = await self.validator.validate_claims(payload) logger.info(f"Token validation successful for user: {payload.get('sub')}") return validation_result except jwt.ExpiredSignatureError: raise ValueError("Token has expired") except jwt.InvalidTokenError as e: raise ValueError(f"Invalid token: {str(e)}") except Exception as e: logger.error(f"Token validation failed: {e}") raise ValueError(f"Token validation failed: {str(e)}") async def refresh_token(self, refresh_token: str) -> Dict[str, Any]: """Refresh access token Args: refresh_token: Refresh token Returns: New token pair """ if not self.enable_refresh: raise ValueError("Token refresh is disabled") try: # Validate refresh token refresh_result = await self.validate_token(refresh_token, 'refresh') refresh_payload = refresh_result['payload'] # Revoke associated access token (if revocation is enabled) if self.enable_revocation: access_jti = refresh_payload.get('access_jti') if access_jti: # Should revoke old access token here, but since we don't know its expiration time, # in practice might need to store more information or use different strategy pass # Build new user information user_info = { 'user_id': refresh_payload.get('sub'), 'roles': refresh_payload.get('roles', []), 'permissions': refresh_payload.get('permissions', []), 'security_level': refresh_payload.get('security_level', 'internal') } # Generate new token pair new_tokens = await self.generate_tokens(user_info) logger.info(f"Token refreshed for user: {user_info['user_id']}") return new_tokens except Exception as e: logger.error(f"Token refresh failed: {e}") raise async def revoke_token(self, token: str) -> bool: """Revoke token Args: token: Token to revoke Returns: Whether revocation was successful """ if not self.enable_revocation: logger.warning("Token revocation is disabled") return False try: # Decode token to get JTI and expiration time verification_key = self.key_manager.get_public_key() payload = jwt.decode( token, verification_key, algorithms=[self.algorithm], options={'verify_exp': False} # Allow decoding expired tokens ) jti = payload.get('jti') exp = payload.get('exp') if not jti or not exp: logger.error("Token missing required claims for revocation") return False # Add to blacklist await self.validator.revoke_token(jti, exp) logger.info(f"Token {jti} revoked successfully") return True except Exception as e: logger.error(f"Token revocation failed: {e}") return False async def decode_token_unsafe(self, token: str) -> Dict[str, Any]: """Decode token without verifying signature (for debugging only) Args: token: JWT token Returns: Token payload """ try: payload = jwt.decode(token, options={'verify_signature': False}) return payload except Exception as e: logger.error(f"Failed to decode token: {e}") raise async def get_token_info(self, token: str) -> Dict[str, Any]: """Get token information (without verifying signature) Args: token: JWT token Returns: Token information """ try: payload = await self.decode_token_unsafe(token) return { 'jti': payload.get('jti'), 'sub': payload.get('sub'), 'iss': payload.get('iss'), 'aud': payload.get('aud'), 'iat': payload.get('iat'), 'exp': payload.get('exp'), 'token_type': payload.get('token_type'), 'roles': payload.get('roles'), 'permissions': payload.get('permissions'), 'security_level': payload.get('security_level'), 'is_expired': payload.get('exp', 0) < time.time() if payload.get('exp') else None } except Exception as e: logger.error(f"Failed to get token info: {e}") raise async def _auto_key_rotation(self): """Automatic key rotation task""" while True: try: # Check if key rotation is needed if await self.key_manager.is_key_expired(): logger.info("Key rotation needed, rotating keys...") await self.key_manager.rotate_keys() # Wait until next check await asyncio.sleep(3600) # Check every hour except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in auto key rotation: {e}") # Wait longer before retry after error await asyncio.sleep(3600) async def get_public_key_info(self) -> Dict[str, Any]: """Get public key information (for client verification) Returns: Public key information """ key_info = await self.key_manager.get_key_info() public_key_pem = await self.key_manager.export_public_key_pem() return { 'algorithm': self.algorithm, 'public_key_pem': public_key_pem, 'key_info': key_info } async def get_manager_stats(self) -> Dict[str, Any]: """Get manager statistics Returns: Statistics information """ key_info = await self.key_manager.get_key_info() validation_stats = await self.validator.get_validation_stats() return { 'jwt_config': { 'algorithm': self.algorithm, 'issuer': self.issuer, 'audience': self.audience, 'access_token_expiry': self.access_token_expiry, 'refresh_token_expiry': self.refresh_token_expiry, 'enable_refresh': self.enable_refresh, 'enable_revocation': self.enable_revocation }, 'key_manager': key_info, 'validator': validation_stats }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/apache/doris-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server