"""Authentication Manager for orchestrating auth flow.
Coordinates token acquisition, refresh, and persistence.
Implements silent-first authentication with browser fallback.
"""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
from sso_mcp_server import get_logger
from sso_mcp_server.auth.browser import DEFAULT_SCOPES, BrowserAuth
if TYPE_CHECKING:
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
_logger = get_logger("auth_manager")
# Token refresh threshold: refresh when less than 5 minutes remaining
TOKEN_REFRESH_THRESHOLD_SECONDS = 5 * 60
class AuthManager:
"""Orchestrates authentication flow and token management.
Implements the authentication strategy:
1. Check for cached tokens
2. If found, attempt silent acquisition (includes refresh)
3. If silent fails, fall back to interactive browser auth
4. Proactively refresh tokens before expiry
Attributes:
settings: Application configuration.
token_store: Token persistence store.
"""
def __init__(self, settings: Settings, token_store: TokenStore) -> None:
"""Initialize the auth manager.
Args:
settings: Application settings with Azure credentials.
token_store: Token store for persistence.
"""
self._settings = settings
self._token_store = token_store
self._browser_auth: BrowserAuth | None = None
# Current token state
self._access_token: str | None = None
self._token_expiry: float = 0
_logger.debug("auth_manager_initialized")
@property
def _app(self) -> BrowserAuth:
"""Get the browser auth instance (lazy initialization).
Returns:
BrowserAuth instance.
"""
if self._browser_auth is None:
self._browser_auth = BrowserAuth(
self._settings,
self._token_store.get_cache(),
)
return self._browser_auth
def is_authenticated(self) -> bool:
"""Check if currently authenticated with valid token.
Returns:
True if authenticated with non-expired token.
"""
if self._access_token is None:
return False
# Check if token is expired
return time.time() < self._token_expiry
def _should_refresh_token(self) -> bool:
"""Check if token should be refreshed proactively.
Tokens are refreshed when less than 5 minutes remaining
to ensure seamless 8+ hour sessions.
Returns:
True if token should be refreshed.
"""
if self._token_expiry == 0:
return True
time_to_expiry = self._token_expiry - time.time()
return time_to_expiry < TOKEN_REFRESH_THRESHOLD_SECONDS
def get_access_token(self) -> str | None:
"""Get the current access token.
Returns:
Access token string, or None if not authenticated.
"""
if not self.is_authenticated():
return None
return self._access_token
def ensure_authenticated(self, scopes: list[str] | None = None) -> bool:
"""Ensure the user is authenticated.
Attempts authentication in this order:
1. Check if already authenticated with valid token
2. Try silent token acquisition (from cache or refresh)
3. Fall back to interactive browser authentication
Args:
scopes: OAuth scopes to request.
Returns:
True if authentication succeeded, False otherwise.
"""
if scopes is None:
scopes = DEFAULT_SCOPES
# Already authenticated and token not near expiry
if self.is_authenticated() and not self._should_refresh_token():
_logger.debug("already_authenticated")
return True
# Try silent auth first (includes refresh)
if self._try_silent_auth(scopes):
return True
# Fall back to interactive
return self._do_interactive_auth(scopes)
def _try_silent_auth(self, scopes: list[str]) -> bool:
"""Attempt silent token acquisition.
Tries to get token from cache or refresh without user interaction.
Args:
scopes: OAuth scopes to request.
Returns:
True if silent auth succeeded.
"""
_logger.debug("attempting_silent_auth")
result = self._app.acquire_token_silent(scopes)
if result and "access_token" in result:
self._update_token_state(result)
_logger.info("silent_auth_success")
return True
_logger.debug("silent_auth_failed")
return False
def _do_interactive_auth(self, scopes: list[str]) -> bool:
"""Perform interactive browser authentication.
Opens system browser for Azure SSO login.
Args:
scopes: OAuth scopes to request.
Returns:
True if interactive auth succeeded.
"""
_logger.info("starting_interactive_auth")
result = self._app.authenticate(scopes)
if result and "access_token" in result:
self._update_token_state(result)
_logger.info("interactive_auth_success")
return True
_logger.error("interactive_auth_failed")
return False
def _update_token_state(self, result: dict) -> None:
"""Update internal token state from auth result.
Args:
result: Token result from MSAL.
"""
self._access_token = result.get("access_token")
# Calculate expiry time
expires_in = result.get("expires_in", 3600)
self._token_expiry = time.time() + expires_in
_logger.debug(
"token_state_updated",
expires_in=expires_in,
expiry_time=self._token_expiry,
)
def logout(self) -> None:
"""Log out and clear all authentication state.
Clears in-memory tokens and persisted cache.
"""
_logger.info("logging_out")
# Clear in-memory state
self._access_token = None
self._token_expiry = 0
# Clear persisted cache
self._token_store.clear_cache()
_logger.info("logout_complete")