Skip to main content
Glama
auth.py8.72 kB
""" Authentication Manager Module CRITICAL SECURITY: Thread-safe authentication token management. This module replaces the global DYNAMIC_AUTH_TOKEN and PERISCOPE_AUTH_TOKEN with a thread-safe, context-based authentication system. Security improvements: - Thread-safe token storage (no race conditions) - Token expiration and rotation support - Separate token contexts (Kibana, Periscope) - Audit logging for token operations """ import time from threading import Lock from typing import Optional, Dict from dataclasses import dataclass from loguru import logger @dataclass class TokenInfo: """ Metadata for an authentication token. Attributes: token: The actual authentication token expires_at: Unix timestamp when token expires (0 = never expires) context: Token context identifier (e.g., 'kibana', 'periscope', 'user123') created_at: Unix timestamp when token was created """ token: str expires_at: float = 0.0 # 0 means never expires context: str = "default" created_at: float = 0.0 def __post_init__(self): if self.created_at == 0.0: self.created_at = time.time() def is_expired(self) -> bool: """Check if token has expired.""" if self.expires_at == 0.0: return False # Never expires return time.time() > self.expires_at def time_until_expiry(self) -> float: """Get seconds until token expires. Returns -1 if never expires.""" if self.expires_at == 0.0: return -1.0 return max(0.0, self.expires_at - time.time()) class AuthManager: """ Thread-safe authentication token manager. Replaces global authentication variables with proper encapsulation and thread safety. Features: - Thread-safe token storage and retrieval - Token expiration management - Multiple token contexts (Kibana, Periscope, users) - Automatic cleanup of expired tokens - Audit logging for security Example: >>> auth = AuthManager() >>> auth.set_token('kibana', 'my-token-123', ttl=3600) >>> token = auth.get_token('kibana') >>> auth.validate_token('kibana', 'my-token-123') True """ def __init__(self): """Initialize the authentication manager.""" self._tokens: Dict[str, TokenInfo] = {} self._lock = Lock() def set_token( self, context: str, token: str, ttl: float = 0.0 ) -> None: """ Store authentication token for a context. Args: context: Token context (e.g., 'kibana', 'periscope', 'user123') token: Authentication token ttl: Time to live in seconds (0 = never expires) Example: >>> auth.set_token('kibana', 'token123', ttl=3600) >>> auth.set_token('periscope', 'token456') # Never expires """ if not token: raise ValueError("Token cannot be empty") if not context: raise ValueError("Context cannot be empty") expires_at = time.time() + ttl if ttl > 0 else 0.0 with self._lock: self._tokens[context] = TokenInfo( token=token, expires_at=expires_at, context=context ) # Log token update (don't log the actual token for security) expiry_msg = f"expires in {ttl}s" if ttl > 0 else "never expires" logger.info( f"Authentication token set for context '{context}' ({expiry_msg})" ) def get_token(self, context: str) -> Optional[str]: """ Get authentication token for a context. Automatically removes expired tokens. Args: context: Token context Returns: Token string if valid, None if not found or expired Example: >>> token = auth.get_token('kibana') >>> if token: ... # Use token for authentication """ with self._lock: token_info = self._tokens.get(context) if not token_info: return None # Check expiration if token_info.is_expired(): logger.warning(f"Token for context '{context}' has expired, removing") del self._tokens[context] return None return token_info.token def validate_token(self, context: str, token: str) -> bool: """ Validate that a token matches the stored token for a context. Args: context: Token context token: Token to validate Returns: True if token is valid and not expired, False otherwise Example: >>> if auth.validate_token('kibana', user_provided_token): ... # Token is valid """ stored_token = self.get_token(context) return stored_token == token if stored_token else False def rotate_token(self, context: str, new_token: str, ttl: float = 0.0) -> None: """ Rotate token for a context (update with new token). This is a convenience method that logs the rotation for audit purposes. Args: context: Token context new_token: New authentication token ttl: Time to live in seconds Example: >>> auth.rotate_token('kibana', 'new-token-xyz', ttl=7200) """ logger.info(f"Rotating authentication token for context '{context}'") self.set_token(context, new_token, ttl) def remove_token(self, context: str) -> bool: """ Remove token for a context. Args: context: Token context to remove Returns: True if token was removed, False if not found Example: >>> auth.remove_token('kibana') """ with self._lock: if context in self._tokens: del self._tokens[context] logger.info(f"Authentication token removed for context '{context}'") return True return False def cleanup_expired_tokens(self) -> int: """ Remove all expired tokens from storage. Returns: Number of expired tokens removed Example: >>> count = auth.cleanup_expired_tokens() >>> print(f"Removed {count} expired tokens") """ with self._lock: expired_contexts = [ context for context, token_info in self._tokens.items() if token_info.is_expired() ] for context in expired_contexts: del self._tokens[context] if expired_contexts: logger.info( f"Cleaned up {len(expired_contexts)} expired tokens: " f"{', '.join(expired_contexts)}" ) return len(expired_contexts) def get_all_contexts(self) -> list[str]: """ Get list of all contexts with tokens. Returns: List of context identifiers Example: >>> contexts = auth.get_all_contexts() >>> print(f"Active tokens: {', '.join(contexts)}") """ with self._lock: return list(self._tokens.keys()) def has_token(self, context: str) -> bool: """ Check if a valid token exists for a context. Args: context: Token context Returns: True if valid token exists, False otherwise Example: >>> if not auth.has_token('kibana'): ... print("Please set Kibana authentication token") """ return self.get_token(context) is not None # Authentication context constants AUTH_CONTEXT_KIBANA = 'kibana' AUTH_CONTEXT_PERISCOPE = 'periscope' # Global singleton instance # This replaces the global DYNAMIC_AUTH_TOKEN and PERISCOPE_AUTH_TOKEN variables auth_manager = AuthManager() # Convenience functions for backward compatibility def get_kibana_token() -> Optional[str]: """Get Kibana authentication token.""" return auth_manager.get_token('kibana') def set_kibana_token(token: str, ttl: float = 0.0) -> None: """Set Kibana authentication token.""" auth_manager.set_token('kibana', token, ttl) def get_periscope_token() -> Optional[str]: """Get Periscope authentication token.""" return auth_manager.get_token('periscope') def set_periscope_token(token: str, ttl: float = 0.0) -> None: """Set Periscope authentication token.""" auth_manager.set_token('periscope', token, ttl)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gaharivatsa/KIBANA_SERVER'

If you have feedback or need assistance with the MCP directory API, please join our Discord server