"""Integration tests for authentication flow.
Tests the complete authentication flow with mocked MSAL and Azure services.
"""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
from unittest.mock import patch
if TYPE_CHECKING:
from pathlib import Path
class TestAuthFlowIntegration:
"""Integration tests for the complete authentication flow."""
def test_fresh_auth_flow_triggers_browser_login(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that fresh authentication triggers browser-based login."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Verify no existing tokens
assert not manager.is_authenticated()
with (
patch.object(manager, "_do_interactive_auth", return_value=True) as mock_interactive,
patch.object(manager, "_try_silent_auth", return_value=False),
):
manager.ensure_authenticated()
mock_interactive.assert_called_once()
def test_cached_token_enables_silent_auth(self, temp_dir: Path, mock_env_vars: dict) -> None:
"""Test that cached tokens enable silent authentication without browser."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Simulate having a cached token via silent auth success
with patch.object(manager, "_try_silent_auth", return_value=True):
with patch.object(manager, "_do_interactive_auth") as mock_interactive:
manager.ensure_authenticated()
# Should NOT have called interactive auth
mock_interactive.assert_not_called()
def test_expired_token_triggers_refresh(self, temp_dir: Path, mock_env_vars: dict) -> None:
"""Test that expired tokens trigger automatic refresh."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Set up expired token
manager._access_token = "expired-token"
manager._token_expiry = time.time() - 60 # Expired
# Silent auth should succeed with refresh
with patch.object(manager, "_try_silent_auth", return_value=True):
assert manager.ensure_authenticated() is True
def test_refresh_failure_falls_back_to_interactive(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that refresh failure falls back to interactive auth."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Silent auth fails (no cached token or refresh failed)
with (
patch.object(manager, "_try_silent_auth", return_value=False),
patch.object(manager, "_do_interactive_auth", return_value=True) as mock_interactive,
):
manager.ensure_authenticated()
mock_interactive.assert_called_once()
class TestTokenPersistence:
"""Tests for token persistence across server restarts."""
def test_tokens_persisted_to_cache_file(self, temp_dir: Path, mock_env_vars: dict) -> None:
"""Test that tokens are persisted to the cache file."""
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
cache_path = settings.token_cache_path
token_store = TokenStore(cache_path)
# Add tokens to cache
cache = token_store.get_cache()
cache.deserialize('{"AccessToken": {"key": {"secret": "test"}}, "RefreshToken": {}}')
# Verify tokens exist
assert token_store.has_cached_tokens()
def test_tokens_loaded_from_cache_on_restart(self, temp_dir: Path, mock_env_vars: dict) -> None:
"""Test that tokens are loaded from cache file on server restart."""
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
cache_path = settings.token_cache_path
# First instance: save tokens
store1 = TokenStore(cache_path)
cache1 = store1.get_cache()
cache1.deserialize('{"AccessToken": {"key": {"secret": "test"}}, "RefreshToken": {}}')
# Second instance (simulating restart): should see tokens
store2 = TokenStore(cache_path)
# The cache should be able to load the persisted data
assert store2.cache_path == cache_path
def test_logout_clears_persisted_tokens(self, temp_dir: Path, mock_env_vars: dict) -> None:
"""Test that logout clears persisted tokens."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Set up authenticated state
manager._access_token = "my-token"
manager._token_expiry = time.time() + 3600
# Add tokens to cache
cache = token_store.get_cache()
cache.deserialize('{"AccessToken": {"key": {"secret": "test"}}, "RefreshToken": {}}')
# Logout should clear everything
manager.logout()
assert not manager.is_authenticated()
class TestSilentReauthOnRestart:
"""Tests for silent re-authentication on server restart (FR-004)."""
def test_silent_reauth_succeeds_with_valid_refresh_token(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that silent re-auth succeeds when refresh token is valid."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Mock silent auth succeeding
with patch.object(manager, "_try_silent_auth", return_value=True):
result = manager.ensure_authenticated()
assert result is True
def test_silent_reauth_fails_prompts_interactive(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that failed silent re-auth prompts interactive login."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Mock silent auth failing, interactive succeeding
with (
patch.object(manager, "_try_silent_auth", return_value=False),
patch.object(manager, "_do_interactive_auth", return_value=True) as mock_interactive,
):
assert manager.ensure_authenticated() is True
mock_interactive.assert_called_once()
class TestEightHourSession:
"""Tests for 8-hour session maintenance (SC-003).
These tests use mocked time to verify proactive token refresh
maintains the session without re-authentication.
"""
def test_session_maintained_for_8_hours_with_proactive_refresh(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that 8-hour session is maintained via proactive token refresh."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Simulate 8-hour session with hourly token refreshes
# Token expires every hour, refresh threshold is 5 minutes
start_time = time.time()
current_time = start_time
eight_hours = 8 * 60 * 60
# Initial authentication
manager._access_token = "initial-token"
manager._token_expiry = current_time + 3600 # 1 hour
reauth_count = 0
refresh_count = 0
# Simulate time progression over 8 hours
while (current_time - start_time) < eight_hours:
# Check if we need to refresh (< 5 minutes remaining)
time_to_expiry = manager._token_expiry - current_time
if time_to_expiry < 300: # 5 minutes
# Simulate proactive refresh
refresh_count += 1
manager._access_token = f"refreshed-token-{refresh_count}"
manager._token_expiry = current_time + 3600 # New 1-hour expiry
# Advance time by 30 minutes
current_time += 30 * 60
# Should have refreshed multiple times but never re-authenticated
assert refresh_count > 0
assert reauth_count == 0
# Rough estimate: 8 hours / 55 minutes (refresh at 5 min before expiry)
# Should be around 8-9 refreshes
assert 7 <= refresh_count <= 16