Skip to main content
Glama
manager.py6.34 kB
"""Authentication Manager for orchestrating auth flow. Coordinates token acquisition, refresh, and persistence. Implements silent-first authentication with browser fallback. """ from __future__ import annotations import time from typing import TYPE_CHECKING from sso_mcp_server import get_logger from sso_mcp_server.auth.browser import DEFAULT_SCOPES, BrowserAuth if TYPE_CHECKING: from sso_mcp_server.auth.token_store import TokenStore from sso_mcp_server.config import Settings _logger = get_logger("auth_manager") # Token refresh threshold: refresh when less than 5 minutes remaining TOKEN_REFRESH_THRESHOLD_SECONDS = 5 * 60 class AuthManager: """Orchestrates authentication flow and token management. Implements the authentication strategy: 1. Check for cached tokens 2. If found, attempt silent acquisition (includes refresh) 3. If silent fails, fall back to interactive browser auth 4. Proactively refresh tokens before expiry Attributes: settings: Application configuration. token_store: Token persistence store. """ def __init__(self, settings: Settings, token_store: TokenStore) -> None: """Initialize the auth manager. Args: settings: Application settings with Azure credentials. token_store: Token store for persistence. """ self._settings = settings self._token_store = token_store self._browser_auth: BrowserAuth | None = None # Current token state self._access_token: str | None = None self._token_expiry: float = 0 _logger.debug("auth_manager_initialized") @property def _app(self) -> BrowserAuth: """Get the browser auth instance (lazy initialization). Returns: BrowserAuth instance. """ if self._browser_auth is None: self._browser_auth = BrowserAuth( self._settings, self._token_store.get_cache(), ) return self._browser_auth def is_authenticated(self) -> bool: """Check if currently authenticated with valid token. Returns: True if authenticated with non-expired token. """ if self._access_token is None: return False # Check if token is expired return time.time() < self._token_expiry def _should_refresh_token(self) -> bool: """Check if token should be refreshed proactively. Tokens are refreshed when less than 5 minutes remaining to ensure seamless 8+ hour sessions. Returns: True if token should be refreshed. """ if self._token_expiry == 0: return True time_to_expiry = self._token_expiry - time.time() return time_to_expiry < TOKEN_REFRESH_THRESHOLD_SECONDS def get_access_token(self) -> str | None: """Get the current access token. Returns: Access token string, or None if not authenticated. """ if not self.is_authenticated(): return None return self._access_token def ensure_authenticated(self, scopes: list[str] | None = None) -> bool: """Ensure the user is authenticated. Attempts authentication in this order: 1. Check if already authenticated with valid token 2. Try silent token acquisition (from cache or refresh) 3. Fall back to interactive browser authentication Args: scopes: OAuth scopes to request. Returns: True if authentication succeeded, False otherwise. """ if scopes is None: scopes = DEFAULT_SCOPES # Already authenticated and token not near expiry if self.is_authenticated() and not self._should_refresh_token(): _logger.debug("already_authenticated") return True # Try silent auth first (includes refresh) if self._try_silent_auth(scopes): return True # Fall back to interactive return self._do_interactive_auth(scopes) def _try_silent_auth(self, scopes: list[str]) -> bool: """Attempt silent token acquisition. Tries to get token from cache or refresh without user interaction. Args: scopes: OAuth scopes to request. Returns: True if silent auth succeeded. """ _logger.debug("attempting_silent_auth") result = self._app.acquire_token_silent(scopes) if result and "access_token" in result: self._update_token_state(result) _logger.info("silent_auth_success") return True _logger.debug("silent_auth_failed") return False def _do_interactive_auth(self, scopes: list[str]) -> bool: """Perform interactive browser authentication. Opens system browser for Azure SSO login. Args: scopes: OAuth scopes to request. Returns: True if interactive auth succeeded. """ _logger.info("starting_interactive_auth") result = self._app.authenticate(scopes) if result and "access_token" in result: self._update_token_state(result) _logger.info("interactive_auth_success") return True _logger.error("interactive_auth_failed") return False def _update_token_state(self, result: dict) -> None: """Update internal token state from auth result. Args: result: Token result from MSAL. """ self._access_token = result.get("access_token") # Calculate expiry time expires_in = result.get("expires_in", 3600) self._token_expiry = time.time() + expires_in _logger.debug( "token_state_updated", expires_in=expires_in, expiry_time=self._token_expiry, ) def logout(self) -> None: """Log out and clear all authentication state. Clears in-memory tokens and persisted cache. """ _logger.info("logging_out") # Clear in-memory state self._access_token = None self._token_expiry = 0 # Clear persisted cache self._token_store.clear_cache() _logger.info("logout_complete")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DauQuangThanh/sso-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server