"""Authentication manager for JIRA API access."""
import logging
from typing import Optional, Union
from requests.auth import HTTPBasicAuth, AuthBase
from mcp_jira.auth.adfs_auth import ADFSAuthenticator
from mcp_jira.auth.kerberos_auth import KerberosAuthenticator
from mcp_jira.config import AuthMethod, Config
logger = logging.getLogger(__name__)
class AuthManager:
"""Manages authentication for JIRA API requests."""
def __init__(self, config: Config):
"""Initialize authentication manager.
Args:
config: Application configuration
"""
self.config = config
self.auth_method = config.auth_method
self._kerberos_auth: Optional[KerberosAuthenticator] = None
self._adfs_auth: Optional[ADFSAuthenticator] = None
self._auth: Optional[Union[AuthBase, tuple]] = None
def _get_kerberos_auth(self) -> AuthBase:
"""Get Kerberos authentication handler."""
if self._kerberos_auth is None:
self._kerberos_auth = KerberosAuthenticator(self.config)
return self._kerberos_auth.get_auth()
def _get_api_token_auth(self) -> HTTPBasicAuth:
"""Get API token authentication (uses HTTP Basic with email and token)."""
if not self.config.jira_email or not self.config.jira_api_token:
raise ValueError("Email and API token are required for API token authentication")
return HTTPBasicAuth(self.config.jira_email, self.config.jira_api_token)
def _get_basic_auth(self) -> HTTPBasicAuth:
"""Get basic authentication handler."""
if not self.config.jira_username or not self.config.jira_password:
raise ValueError("Username and password are required for basic authentication")
return HTTPBasicAuth(self.config.jira_username, self.config.jira_password)
def _get_adfs_auth(self) -> AuthBase:
"""Get ADFS authentication handler."""
if self._adfs_auth is None:
self._adfs_auth = ADFSAuthenticator(self.config)
# Get the Kerberos auth object for ADFS SSO
return self._adfs_auth.get_requests_auth()
def get_auth(self) -> Union[AuthBase, tuple]:
"""Get authentication handler based on configured method.
Returns:
Authentication handler for requests library
Raises:
ValueError: If authentication configuration is invalid
"""
if self._auth is None:
if self.auth_method == AuthMethod.KERBEROS:
logger.info("Using Kerberos authentication")
self._auth = self._get_kerberos_auth()
elif self.auth_method == AuthMethod.ADFS:
logger.info("Using ADFS authentication with Kerberos SSO")
self._auth = self._get_adfs_auth()
elif self.auth_method == AuthMethod.API_TOKEN:
logger.info("Using API token authentication")
self._auth = self._get_api_token_auth()
elif self.auth_method == AuthMethod.BASIC:
logger.warning(
"Using basic authentication (not recommended for production)"
)
self._auth = self._get_basic_auth()
else:
raise ValueError(f"Unsupported authentication method: {self.auth_method}")
return self._auth
def refresh_credentials(self) -> None:
"""Refresh authentication credentials if applicable."""
if self.auth_method == AuthMethod.KERBEROS and self._kerberos_auth:
logger.info("Refreshing Kerberos credentials")
self._kerberos_auth.refresh_ticket()
self._auth = None # Force recreation on next get_auth()