Skip to main content
Glama
test_auth_flow.py10.1 kB
"""Integration tests for authentication flow. Tests the complete authentication flow with mocked MSAL and Azure services. """ from __future__ import annotations import time from typing import TYPE_CHECKING from unittest.mock import patch if TYPE_CHECKING: from pathlib import Path class TestAuthFlowIntegration: """Integration tests for the complete authentication flow.""" def test_fresh_auth_flow_triggers_browser_login( self, temp_dir: Path, mock_env_vars: dict ) -> None: """Test that fresh authentication triggers browser-based login.""" from sso_mcp_server.auth.manager import AuthManager from sso_mcp_server.auth.token_store import TokenStore from sso_mcp_server.config import Settings settings = Settings.from_env() token_store = TokenStore(settings.token_cache_path) manager = AuthManager(settings, token_store) # Verify no existing tokens assert not manager.is_authenticated() with ( patch.object(manager, "_do_interactive_auth", return_value=True) as mock_interactive, patch.object(manager, "_try_silent_auth", return_value=False), ): manager.ensure_authenticated() mock_interactive.assert_called_once() def test_cached_token_enables_silent_auth(self, temp_dir: Path, mock_env_vars: dict) -> None: """Test that cached tokens enable silent authentication without browser.""" from sso_mcp_server.auth.manager import AuthManager from sso_mcp_server.auth.token_store import TokenStore from sso_mcp_server.config import Settings settings = Settings.from_env() token_store = TokenStore(settings.token_cache_path) manager = AuthManager(settings, token_store) # Simulate having a cached token via silent auth success with patch.object(manager, "_try_silent_auth", return_value=True): with patch.object(manager, "_do_interactive_auth") as mock_interactive: manager.ensure_authenticated() # Should NOT have called interactive auth mock_interactive.assert_not_called() def test_expired_token_triggers_refresh(self, temp_dir: Path, mock_env_vars: dict) -> None: """Test that expired tokens trigger automatic refresh.""" from sso_mcp_server.auth.manager import AuthManager from sso_mcp_server.auth.token_store import TokenStore from sso_mcp_server.config import Settings settings = Settings.from_env() token_store = TokenStore(settings.token_cache_path) manager = AuthManager(settings, token_store) # Set up expired token manager._access_token = "expired-token" manager._token_expiry = time.time() - 60 # Expired # Silent auth should succeed with refresh with patch.object(manager, "_try_silent_auth", return_value=True): assert manager.ensure_authenticated() is True def test_refresh_failure_falls_back_to_interactive( self, temp_dir: Path, mock_env_vars: dict ) -> None: """Test that refresh failure falls back to interactive auth.""" from sso_mcp_server.auth.manager import AuthManager from sso_mcp_server.auth.token_store import TokenStore from sso_mcp_server.config import Settings settings = Settings.from_env() token_store = TokenStore(settings.token_cache_path) manager = AuthManager(settings, token_store) # Silent auth fails (no cached token or refresh failed) with ( patch.object(manager, "_try_silent_auth", return_value=False), patch.object(manager, "_do_interactive_auth", return_value=True) as mock_interactive, ): manager.ensure_authenticated() mock_interactive.assert_called_once() class TestTokenPersistence: """Tests for token persistence across server restarts.""" def test_tokens_persisted_to_cache_file(self, temp_dir: Path, mock_env_vars: dict) -> None: """Test that tokens are persisted to the cache file.""" from sso_mcp_server.auth.token_store import TokenStore from sso_mcp_server.config import Settings settings = Settings.from_env() cache_path = settings.token_cache_path token_store = TokenStore(cache_path) # Add tokens to cache cache = token_store.get_cache() cache.deserialize('{"AccessToken": {"key": {"secret": "test"}}, "RefreshToken": {}}') # Verify tokens exist assert token_store.has_cached_tokens() def test_tokens_loaded_from_cache_on_restart(self, temp_dir: Path, mock_env_vars: dict) -> None: """Test that tokens are loaded from cache file on server restart.""" from sso_mcp_server.auth.token_store import TokenStore from sso_mcp_server.config import Settings settings = Settings.from_env() cache_path = settings.token_cache_path # First instance: save tokens store1 = TokenStore(cache_path) cache1 = store1.get_cache() cache1.deserialize('{"AccessToken": {"key": {"secret": "test"}}, "RefreshToken": {}}') # Second instance (simulating restart): should see tokens store2 = TokenStore(cache_path) # The cache should be able to load the persisted data assert store2.cache_path == cache_path def test_logout_clears_persisted_tokens(self, temp_dir: Path, mock_env_vars: dict) -> None: """Test that logout clears persisted tokens.""" from sso_mcp_server.auth.manager import AuthManager from sso_mcp_server.auth.token_store import TokenStore from sso_mcp_server.config import Settings settings = Settings.from_env() token_store = TokenStore(settings.token_cache_path) manager = AuthManager(settings, token_store) # Set up authenticated state manager._access_token = "my-token" manager._token_expiry = time.time() + 3600 # Add tokens to cache cache = token_store.get_cache() cache.deserialize('{"AccessToken": {"key": {"secret": "test"}}, "RefreshToken": {}}') # Logout should clear everything manager.logout() assert not manager.is_authenticated() class TestSilentReauthOnRestart: """Tests for silent re-authentication on server restart (FR-004).""" def test_silent_reauth_succeeds_with_valid_refresh_token( self, temp_dir: Path, mock_env_vars: dict ) -> None: """Test that silent re-auth succeeds when refresh token is valid.""" from sso_mcp_server.auth.manager import AuthManager from sso_mcp_server.auth.token_store import TokenStore from sso_mcp_server.config import Settings settings = Settings.from_env() token_store = TokenStore(settings.token_cache_path) manager = AuthManager(settings, token_store) # Mock silent auth succeeding with patch.object(manager, "_try_silent_auth", return_value=True): result = manager.ensure_authenticated() assert result is True def test_silent_reauth_fails_prompts_interactive( self, temp_dir: Path, mock_env_vars: dict ) -> None: """Test that failed silent re-auth prompts interactive login.""" from sso_mcp_server.auth.manager import AuthManager from sso_mcp_server.auth.token_store import TokenStore from sso_mcp_server.config import Settings settings = Settings.from_env() token_store = TokenStore(settings.token_cache_path) manager = AuthManager(settings, token_store) # Mock silent auth failing, interactive succeeding with ( patch.object(manager, "_try_silent_auth", return_value=False), patch.object(manager, "_do_interactive_auth", return_value=True) as mock_interactive, ): assert manager.ensure_authenticated() is True mock_interactive.assert_called_once() class TestEightHourSession: """Tests for 8-hour session maintenance (SC-003). These tests use mocked time to verify proactive token refresh maintains the session without re-authentication. """ def test_session_maintained_for_8_hours_with_proactive_refresh( self, temp_dir: Path, mock_env_vars: dict ) -> None: """Test that 8-hour session is maintained via proactive token refresh.""" from sso_mcp_server.auth.manager import AuthManager from sso_mcp_server.auth.token_store import TokenStore from sso_mcp_server.config import Settings settings = Settings.from_env() token_store = TokenStore(settings.token_cache_path) manager = AuthManager(settings, token_store) # Simulate 8-hour session with hourly token refreshes # Token expires every hour, refresh threshold is 5 minutes start_time = time.time() current_time = start_time eight_hours = 8 * 60 * 60 # Initial authentication manager._access_token = "initial-token" manager._token_expiry = current_time + 3600 # 1 hour reauth_count = 0 refresh_count = 0 # Simulate time progression over 8 hours while (current_time - start_time) < eight_hours: # Check if we need to refresh (< 5 minutes remaining) time_to_expiry = manager._token_expiry - current_time if time_to_expiry < 300: # 5 minutes # Simulate proactive refresh refresh_count += 1 manager._access_token = f"refreshed-token-{refresh_count}" manager._token_expiry = current_time + 3600 # New 1-hour expiry # Advance time by 30 minutes current_time += 30 * 60 # Should have refreshed multiple times but never re-authenticated assert refresh_count > 0 assert reauth_count == 0 # Rough estimate: 8 hours / 55 minutes (refresh at 5 min before expiry) # Should be around 8-9 refreshes assert 7 <= refresh_count <= 16

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DauQuangThanh/sso-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server