"""Browser-based OAuth authentication flow.
Implements OAuth 2.0 Authorization Code flow with PKCE using MSAL.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from msal import PublicClientApplication
from sso_mcp_server import get_logger
if TYPE_CHECKING:
from msal_extensions import PersistedTokenCache
from sso_mcp_server.config import Settings
_logger = get_logger("browser_auth")
# OAuth scopes for Azure Entra ID
# Using default scope that provides access token for the app
DEFAULT_SCOPES = ["User.Read"]
class BrowserAuth:
"""Browser-based OAuth 2.0 authentication with PKCE.
Uses MSAL's acquire_token_interactive for browser-based SSO login.
PKCE is automatically handled by MSAL for public clients.
"""
def __init__(
self,
settings: Settings,
token_cache: PersistedTokenCache,
) -> None:
"""Initialize browser authentication.
Args:
settings: Application settings with Azure credentials.
token_cache: MSAL token cache for persistence.
"""
self._settings = settings
self._token_cache = token_cache
# Create MSAL public client application
authority = f"https://login.microsoftonline.com/{settings.azure_tenant_id}"
self._app = PublicClientApplication(
client_id=settings.azure_client_id,
authority=authority,
token_cache=token_cache,
)
_logger.debug(
"browser_auth_initialized",
client_id=settings.azure_client_id[:8] + "...",
tenant_id=settings.azure_tenant_id[:8] + "...",
)
@property
def app(self) -> PublicClientApplication:
"""Get the MSAL application instance.
Returns:
The configured PublicClientApplication.
"""
return self._app
def authenticate(self, scopes: list[str] | None = None) -> dict[str, Any] | None:
"""Perform interactive browser-based authentication.
Opens the system browser for Azure SSO login. PKCE is
automatically used by MSAL for security.
Args:
scopes: OAuth scopes to request. Defaults to DEFAULT_SCOPES.
Returns:
Token result dictionary containing access_token, or None on failure.
"""
if scopes is None:
scopes = DEFAULT_SCOPES
_logger.info("browser_auth_starting", scopes=scopes)
try:
# acquire_token_interactive opens system browser
# PKCE is automatically used for public clients
result = self._app.acquire_token_interactive(
scopes=scopes,
prompt="select_account", # Allow account selection
)
if "access_token" in result:
_logger.info(
"browser_auth_success",
expires_in=result.get("expires_in"),
)
return result
# Auth failed
error = result.get("error", "unknown")
error_description = result.get("error_description", "No description")
_logger.error(
"browser_auth_failed",
error=error,
error_description=error_description,
)
return None
except Exception as e:
_logger.exception("browser_auth_exception", error=str(e))
return None
def get_accounts(self) -> list[dict[str, Any]]:
"""Get cached accounts from MSAL.
Returns:
List of cached account dictionaries.
"""
return self._app.get_accounts()
def acquire_token_silent(
self,
scopes: list[str] | None = None,
account: dict[str, Any] | None = None,
) -> dict[str, Any] | None:
"""Attempt to acquire token silently from cache.
Tries to get a token from the cache or refresh it without
user interaction.
Args:
scopes: OAuth scopes to request.
account: Specific account to use, or None for first available.
Returns:
Token result dictionary, or None if silent auth fails.
"""
if scopes is None:
scopes = DEFAULT_SCOPES
# Get account to use
if account is None:
accounts = self.get_accounts()
if accounts:
account = accounts[0]
else:
_logger.debug("no_cached_accounts")
return None
_logger.debug("attempting_silent_auth", account=account.get("username"))
result = self._app.acquire_token_silent(
scopes=scopes,
account=account,
)
if result and "access_token" in result:
_logger.debug("silent_auth_success")
return result
_logger.debug("silent_auth_failed")
return None