"""Unit tests for Auth Manager component.
Tests authentication orchestration, token management, and refresh logic.
"""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
from unittest.mock import MagicMock, patch
if TYPE_CHECKING:
from pathlib import Path
class TestAuthManager:
"""Tests for AuthManager class."""
def test_is_authenticated_returns_false_initially(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that is_authenticated returns False when no tokens exist."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
assert manager.is_authenticated() is False
def test_is_authenticated_returns_true_with_valid_token(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that is_authenticated returns True when valid token exists."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Mock having a valid token
manager._access_token = "valid-token"
manager._token_expiry = time.time() + 3600 # 1 hour from now
assert manager.is_authenticated() is True
def test_is_authenticated_returns_false_with_expired_token(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that is_authenticated returns False when token is expired."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Mock having an expired token
manager._access_token = "expired-token"
manager._token_expiry = time.time() - 3600 # 1 hour ago
assert manager.is_authenticated() is False
def test_should_refresh_token_returns_false_when_not_near_expiry(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that _should_refresh_token returns False when token is valid."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Token expires in 1 hour
manager._token_expiry = time.time() + 3600
assert manager._should_refresh_token() is False
def test_should_refresh_token_returns_true_when_near_expiry(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that _should_refresh_token returns True when < 5 minutes remaining."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Token expires in 3 minutes (less than 5 minute threshold)
manager._token_expiry = time.time() + 180
assert manager._should_refresh_token() is True
def test_should_refresh_token_returns_true_when_expired(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that _should_refresh_token returns True when token is expired."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Token already expired
manager._token_expiry = time.time() - 60
assert manager._should_refresh_token() is True
def test_get_access_token_returns_none_when_not_authenticated(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that get_access_token returns None when not authenticated."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
assert manager.get_access_token() is None
def test_get_access_token_returns_token_when_authenticated(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that get_access_token returns the token when authenticated."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Mock having a valid token
manager._access_token = "my-access-token"
manager._token_expiry = time.time() + 3600
assert manager.get_access_token() == "my-access-token"
def test_logout_clears_tokens(self, temp_dir: Path, mock_env_vars: dict) -> None:
"""Test that logout clears all authentication state."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Set up authenticated state
manager._access_token = "my-access-token"
manager._token_expiry = time.time() + 3600
assert manager.is_authenticated() is True
# Logout
manager.logout()
assert manager.is_authenticated() is False
assert manager.get_access_token() is None
class TestAuthManagerMSALIntegration:
"""Tests for AuthManager MSAL integration."""
def test_ensure_authenticated_attempts_silent_acquisition_first(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that ensure_authenticated tries silent token acquisition first."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Mock the browser auth instance (internal attribute)
mock_browser_auth = MagicMock()
mock_browser_auth.acquire_token_silent.return_value = {
"access_token": "silent-token",
"expires_in": 3600,
}
manager._browser_auth = mock_browser_auth
# Should attempt silent acquisition - the _try_silent_auth internally calls _app
with patch.object(manager, "_try_silent_auth", return_value=True):
assert manager.ensure_authenticated() is True
def test_ensure_authenticated_falls_back_to_interactive_on_silent_failure(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that ensure_authenticated falls back to interactive auth."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Mock silent auth failing, interactive succeeding
with patch.object(manager, "_try_silent_auth", return_value=False):
with patch.object(manager, "_do_interactive_auth", return_value=True):
assert manager.ensure_authenticated() is True
# Should have attempted interactive auth
manager._do_interactive_auth.assert_called_once()
class TestAuthManagerRefreshThreshold:
"""Tests for token refresh threshold (5 minutes)."""
# Token refresh threshold is 5 minutes = 300 seconds
REFRESH_THRESHOLD = 300
def test_token_refreshed_when_less_than_5_minutes_remaining(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that token is refreshed when less than 5 minutes remaining."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Token expires in 4 minutes (below threshold)
manager._access_token = "old-token"
manager._token_expiry = time.time() + 240
assert manager._should_refresh_token() is True
def test_token_not_refreshed_when_more_than_5_minutes_remaining(
self, temp_dir: Path, mock_env_vars: dict
) -> None:
"""Test that token is not refreshed when more than 5 minutes remaining."""
from sso_mcp_server.auth.manager import AuthManager
from sso_mcp_server.auth.token_store import TokenStore
from sso_mcp_server.config import Settings
settings = Settings.from_env()
token_store = TokenStore(settings.token_cache_path)
manager = AuthManager(settings, token_store)
# Token expires in 10 minutes (above threshold)
manager._access_token = "valid-token"
manager._token_expiry = time.time() + 600
assert manager._should_refresh_token() is False