"""ADFS token authentication using Kerberos credentials."""
import logging
import os
from typing import Optional
from xml.etree import ElementTree as ET
import requests
from requests_gssapi import HTTPKerberosAuth, REQUIRED, OPTIONAL
from mcp_jira.config import Config
logger = logging.getLogger(__name__)
class ADFSAuthenticator:
"""Handles ADFS token generation using Kerberos SSO."""
def __init__(self, config: Config):
"""Initialize ADFS authenticator.
Args:
config: Application configuration
"""
self.config = config
self.adfs_url = config.adfs_url if hasattr(config, 'adfs_url') else None
self.jira_url = config.jira_url
self._token: Optional[str] = None
self._kerberos_auth: Optional[HTTPKerberosAuth] = None
def _get_kerberos_auth(self) -> HTTPKerberosAuth:
"""Get Kerberos authentication handler.
Returns:
HTTPKerberosAuth instance
"""
if self._kerberos_auth is None:
mutual_auth = REQUIRED if self.config.kerberos_mutual_auth else OPTIONAL
self._kerberos_auth = HTTPKerberosAuth(
mutual_authentication=mutual_auth,
sanitize_mutual_error_response=False,
)
return self._kerberos_auth
def get_adfs_token(self) -> Optional[str]:
"""Get ADFS token using Kerberos credentials from ccache.
This method uses the Kerberos credential cache (KRB5CCNAME) to
authenticate with ADFS and retrieve a SAML or OAuth token.
Returns:
ADFS token string or None if unable to obtain
Raises:
RuntimeError: If token acquisition fails
"""
if self._token:
return self._token
try:
# Method 1: Direct Kerberos authentication to JIRA
# If JIRA supports Kerberos/SPNEGO directly, use that
logger.info("Attempting direct Kerberos authentication to JIRA")
kerberos_auth = self._get_kerberos_auth()
# Test authentication by making a simple request
response = requests.get(
f"{self.jira_url}/rest/api/3/myself",
auth=kerberos_auth,
timeout=self.config.request_timeout,
)
if response.status_code == 200:
logger.info("Successfully authenticated with Kerberos/SPNEGO")
# For direct Kerberos auth, we don't need a separate token
# The HTTPKerberosAuth handler will manage the authentication
self._token = "KERBEROS_SSO"
return self._token
# Method 2: ADFS SAML token acquisition (if ADFS URL is configured)
if self.adfs_url:
logger.info(f"Attempting ADFS token acquisition from {self.adfs_url}")
return self._get_saml_token_from_adfs()
raise RuntimeError(
f"Failed to authenticate. Response: {response.status_code}"
)
except requests.exceptions.RequestException as e:
logger.error(f"Failed to obtain ADFS token: {e}")
raise RuntimeError(f"ADFS token acquisition failed: {e}") from e
def _get_saml_token_from_adfs(self) -> Optional[str]:
"""Get SAML token from ADFS endpoint.
Returns:
SAML token string or None
"""
try:
kerberos_auth = self._get_kerberos_auth()
# ADFS WS-Trust endpoint (adjust based on your ADFS configuration)
wstrust_endpoint = f"{self.adfs_url}/adfs/services/trust/13/windowstransport"
# WS-Trust request envelope
request_envelope = f"""<?xml version="1.0" encoding="UTF-8"?>
<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"
xmlns:a="http://www.w3.org/2005/08/addressing"
xmlns:u="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">
<s:Header>
<a:Action s:mustUnderstand="1">http://docs.oasis-open.org/ws-sx/ws-trust/200512/RST/Issue</a:Action>
<a:To s:mustUnderstand="1">{wstrust_endpoint}</a:To>
</s:Header>
<s:Body>
<trust:RequestSecurityToken xmlns:trust="http://docs.oasis-open.org/ws-sx/ws-trust/200512">
<wsp:AppliesTo xmlns:wsp="http://schemas.xmlsoap.org/ws/2004/09/policy">
<a:EndpointReference>
<a:Address>{self.jira_url}</a:Address>
</a:EndpointReference>
</wsp:AppliesTo>
<trust:RequestType>http://docs.oasis-open.org/ws-sx/ws-trust/200512/Issue</trust:RequestType>
</trust:RequestSecurityToken>
</s:Body>
</s:Envelope>"""
response = requests.post(
wstrust_endpoint,
data=request_envelope,
auth=kerberos_auth,
headers={"Content-Type": "application/soap+xml; charset=utf-8"},
timeout=self.config.request_timeout,
)
if response.status_code == 200:
# Parse SAML token from response
token = self._extract_saml_token(response.text)
if token:
logger.info("Successfully obtained SAML token from ADFS")
self._token = token
return token
logger.warning(f"ADFS token request failed: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error getting SAML token from ADFS: {e}")
return None
def _extract_saml_token(self, soap_response: str) -> Optional[str]:
"""Extract SAML assertion from SOAP response.
Args:
soap_response: SOAP response XML
Returns:
SAML token string or None
"""
try:
# Parse the SOAP response
root = ET.fromstring(soap_response)
# Find SAML assertion (namespace-aware)
namespaces = {
's': 'http://www.w3.org/2003/05/soap-envelope',
'trust': 'http://docs.oasis-open.org/ws-sx/ws-trust/200512',
'saml': 'urn:oasis:names:tc:SAML:2.0:assertion',
}
# Look for RequestedSecurityToken
token_element = root.find('.//trust:RequestedSecurityToken', namespaces)
if token_element is not None:
# Get the SAML assertion
saml_assertion = token_element.find('.//saml:Assertion', namespaces)
if saml_assertion is not None:
return ET.tostring(saml_assertion, encoding='unicode')
return None
except ET.ParseError as e:
logger.error(f"Failed to parse SAML response: {e}")
return None
def get_auth_header(self) -> dict:
"""Get authentication header for JIRA requests.
Returns:
Dictionary with authentication headers
"""
token = self.get_adfs_token()
if token == "KERBEROS_SSO":
# For direct Kerberos auth, return empty dict
# The requests library will use the auth parameter instead
return {}
elif token:
# For SAML token, use Bearer authentication
return {"Authorization": f"Bearer {token}"}
return {}
def get_requests_auth(self) -> Optional[HTTPKerberosAuth]:
"""Get requests auth object for Kerberos authentication.
Returns:
HTTPKerberosAuth instance or None
"""
if self._token == "KERBEROS_SSO":
return self._get_kerberos_auth()
return None
def clear_token(self) -> None:
"""Clear cached token to force refresh."""
self._token = None
logger.info("ADFS token cache cleared")