"""Configuration settings module for SSO MCP Server.
Loads configuration from environment variables with validation.
Supports dual-mode authentication: LOCAL (OAuth client) and CLOUD (Resource Server).
"""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import ClassVar
from dotenv import load_dotenv
class AuthMode(Enum):
"""Authentication mode for the MCP server.
LOCAL: Server acts as OAuth client, acquires tokens via browser SSO.
CLOUD: Server acts as Resource Server, validates incoming Bearer tokens.
AUTO: Automatically detect mode based on request context.
"""
LOCAL = "local"
CLOUD = "cloud"
AUTO = "auto"
class ConfigurationError(Exception):
"""Raised when configuration is invalid or missing.
Attributes:
message: Description of the configuration error.
action: Actionable guidance on how to fix the error.
"""
def __init__(self, message: str, action: str | None = None) -> None:
"""Initialize ConfigurationError.
Args:
message: Description of the configuration error.
action: Actionable guidance on how to fix the error.
"""
self.message = message
self.action = action
super().__init__(message)
def __str__(self) -> str:
"""Return formatted error message with action guidance."""
if self.action:
return f"{self.message}\n Action: {self.action}"
return self.message
@dataclass
class Settings:
"""Application settings loaded from environment variables.
Supports dual-mode authentication:
- LOCAL mode: Server acts as OAuth client (requires azure_client_id, azure_tenant_id)
- CLOUD mode: Server acts as Resource Server (requires resource_identifier, allowed_issuers)
Attributes:
auth_mode: Authentication mode (LOCAL, CLOUD, AUTO).
azure_client_id: Azure App Registration client ID (LOCAL mode).
azure_tenant_id: Azure tenant ID (LOCAL mode).
resource_identifier: This server's resource URL for audience validation (CLOUD mode).
allowed_issuers: List of allowed token issuers (CLOUD mode).
jwks_cache_ttl: JWKS cache time-to-live in seconds (CLOUD mode).
scopes_supported: List of supported scopes to advertise (CLOUD mode).
checklist_dir: Path to checklist files directory.
process_dir: Path to process files directory.
mcp_port: Port for MCP HTTP server.
log_level: Logging level (DEBUG, INFO, WARNING, ERROR).
token_cache_path: Path to encrypted token cache file (LOCAL mode).
"""
# Authentication mode
auth_mode: AuthMode = AuthMode.LOCAL
# LOCAL mode settings (OAuth client)
azure_client_id: str = ""
azure_tenant_id: str = ""
token_cache_path: Path = field(
default_factory=lambda: Path.home() / ".sso-mcp-server" / "token_cache.bin"
)
# CLOUD mode settings (Resource Server)
resource_identifier: str = ""
allowed_issuers: list[str] = field(default_factory=list)
jwks_cache_ttl: int = 3600
scopes_supported: list[str] = field(default_factory=list)
# Common settings
checklist_dir: Path = field(default_factory=lambda: Path("./checklists"))
process_dir: Path = field(default_factory=lambda: Path("./processes"))
mcp_port: int = 8080
log_level: str = "INFO"
# Valid log levels
VALID_LOG_LEVELS: ClassVar[set[str]] = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}
def __post_init__(self) -> None:
"""Validate settings after initialization."""
self._validate()
def _validate(self) -> None:
"""Validate all settings.
Validates mode-specific requirements:
- LOCAL mode: requires azure_client_id and azure_tenant_id
- CLOUD mode: requires resource_identifier and allowed_issuers
- AUTO mode: allows either configuration
Raises:
ConfigurationError: If any setting is invalid.
"""
if self.auth_mode == AuthMode.LOCAL:
self._validate_local_mode()
elif self.auth_mode == AuthMode.CLOUD:
self._validate_cloud_mode()
self._validate_common()
def _validate_local_mode(self) -> None:
"""Validate LOCAL mode requirements."""
if not self.azure_client_id:
raise ConfigurationError(
"AZURE_CLIENT_ID is required for LOCAL auth mode",
action="Set AZURE_CLIENT_ID environment variable or add it to .env file. "
"Get this from Azure Portal > App registrations > Application (client) ID.",
)
if not self.azure_tenant_id:
raise ConfigurationError(
"AZURE_TENANT_ID is required for LOCAL auth mode",
action="Set AZURE_TENANT_ID environment variable or add it to .env file. "
"Get this from Azure Portal > App registrations > Directory (tenant) ID.",
)
def _validate_cloud_mode(self) -> None:
"""Validate CLOUD mode requirements."""
if not self.resource_identifier:
raise ConfigurationError(
"RESOURCE_IDENTIFIER is required for CLOUD auth mode",
action=(
"Set RESOURCE_IDENTIFIER to this server's URL "
"(e.g., https://mcp.example.com). "
"This is used for token audience validation (RFC 8707)."
),
)
if not self.allowed_issuers:
raise ConfigurationError(
"ALLOWED_ISSUERS is required for CLOUD auth mode",
action="Set ALLOWED_ISSUERS to comma-separated list of trusted token issuers. "
"Example: https://login.microsoftonline.com/your-tenant/v2.0",
)
if not self.resource_identifier.startswith(("http://", "https://")):
raise ConfigurationError(
f"RESOURCE_IDENTIFIER must be a valid URL, got: {self.resource_identifier}",
action="Set RESOURCE_IDENTIFIER to a full URL including scheme (https://...).",
)
for issuer in self.allowed_issuers:
if not issuer.startswith(("http://", "https://")):
raise ConfigurationError(
f"ALLOWED_ISSUERS must contain valid URLs, got: {issuer}",
action="Ensure all issuers are full URLs including scheme (https://...).",
)
def _validate_common(self) -> None:
"""Validate common settings for all modes."""
if self.jwks_cache_ttl < 0:
raise ConfigurationError(
f"JWKS_CACHE_TTL must be non-negative, got {self.jwks_cache_ttl}",
action="Set JWKS_CACHE_TTL to a positive number of seconds (default: 3600).",
)
if not self.checklist_dir.exists():
raise ConfigurationError(
f"CHECKLIST_DIR does not exist: {self.checklist_dir}",
action=f"Create the directory: mkdir -p {self.checklist_dir}",
)
if not self.checklist_dir.is_dir():
raise ConfigurationError(
f"CHECKLIST_DIR is not a directory: {self.checklist_dir}",
action="Ensure CHECKLIST_DIR points to a directory, not a file.",
)
# Process directory validation - lenient (doesn't fail if missing)
if self.process_dir.exists() and not self.process_dir.is_dir():
raise ConfigurationError(
f"PROCESS_DIR is not a directory: {self.process_dir}",
action="Ensure PROCESS_DIR points to a directory, not a file.",
)
if not 1024 <= self.mcp_port <= 65535:
raise ConfigurationError(
f"MCP_PORT must be between 1024 and 65535, got {self.mcp_port}",
action="Use a port number between 1024 and 65535. Default is 8080.",
)
if self.log_level.upper() not in self.VALID_LOG_LEVELS:
raise ConfigurationError(
f"LOG_LEVEL must be one of {sorted(self.VALID_LOG_LEVELS)}, got {self.log_level}",
action="Set LOG_LEVEL to one of: DEBUG, INFO, WARNING, ERROR, CRITICAL.",
)
@classmethod
def from_env(cls, env_file: Path | None = None) -> Settings:
"""Load settings from environment variables.
Args:
env_file: Optional path to .env file to load.
Returns:
Settings instance with values from environment.
Raises:
ConfigurationError: If required environment variables are missing or invalid.
"""
if env_file:
load_dotenv(env_file)
else:
load_dotenv()
# Parse auth mode
auth_mode_str = os.getenv("AUTH_MODE", "local").lower()
try:
auth_mode = AuthMode(auth_mode_str)
except ValueError as e:
valid_modes = [m.value for m in AuthMode]
raise ConfigurationError(
f"AUTH_MODE must be one of {valid_modes}, got '{auth_mode_str}'",
action="Set AUTH_MODE to 'local', 'cloud', or 'auto'.",
) from e
# Parse checklist directory
checklist_dir_str = os.getenv("CHECKLIST_DIR", "./checklists")
checklist_dir = Path(checklist_dir_str).resolve()
# Parse process directory
process_dir_str = os.getenv("PROCESS_DIR", "./processes")
process_dir = Path(process_dir_str).resolve()
# Parse MCP port
mcp_port_str = os.getenv("MCP_PORT", "8080")
try:
mcp_port = int(mcp_port_str)
except ValueError as e:
raise ConfigurationError(
f"MCP_PORT must be an integer, got '{mcp_port_str}'",
action="Set MCP_PORT to a valid integer port number (e.g., 8080).",
) from e
# Parse token cache path (LOCAL mode)
token_cache_str = os.getenv("TOKEN_CACHE_PATH")
if token_cache_str:
token_cache_path = Path(token_cache_str).expanduser()
else:
token_cache_path = Path.home() / ".sso-mcp-server" / "token_cache.bin"
# Parse allowed issuers (CLOUD mode) - comma-separated
allowed_issuers_str = os.getenv("ALLOWED_ISSUERS", "")
allowed_issuers = [
issuer.strip() for issuer in allowed_issuers_str.split(",") if issuer.strip()
]
# Parse supported scopes (CLOUD mode) - comma-separated
scopes_supported_str = os.getenv("SCOPES_SUPPORTED", "")
scopes_supported = [
scope.strip() for scope in scopes_supported_str.split(",") if scope.strip()
]
# Parse JWKS cache TTL
jwks_cache_ttl_str = os.getenv("JWKS_CACHE_TTL", "3600")
try:
jwks_cache_ttl = int(jwks_cache_ttl_str)
except ValueError as e:
raise ConfigurationError(
f"JWKS_CACHE_TTL must be an integer, got '{jwks_cache_ttl_str}'",
action="Set JWKS_CACHE_TTL to a valid integer (seconds).",
) from e
return cls(
auth_mode=auth_mode,
azure_client_id=os.getenv("AZURE_CLIENT_ID", ""),
azure_tenant_id=os.getenv("AZURE_TENANT_ID", ""),
token_cache_path=token_cache_path,
resource_identifier=os.getenv("RESOURCE_IDENTIFIER", ""),
allowed_issuers=allowed_issuers,
jwks_cache_ttl=jwks_cache_ttl,
scopes_supported=scopes_supported,
checklist_dir=checklist_dir,
process_dir=process_dir,
mcp_port=mcp_port,
log_level=os.getenv("LOG_LEVEL", "INFO").upper(),
)
# Global settings instance (lazily initialized)
_settings: Settings | None = None
def get_settings() -> Settings:
"""Get the global settings instance.
Returns:
Settings instance loaded from environment.
"""
global _settings # noqa: PLW0603
if _settings is None:
_settings = Settings.from_env()
return _settings
def reset_settings() -> None:
"""Reset the global settings instance (for testing)."""
global _settings # noqa: PLW0603
_settings = None