Skip to main content
Glama
adfs_auth.py7.9 kB
"""ADFS token authentication using Kerberos credentials.""" import logging import os from typing import Optional from xml.etree import ElementTree as ET import requests from requests_gssapi import HTTPKerberosAuth, REQUIRED, OPTIONAL from mcp_jira.config import Config logger = logging.getLogger(__name__) class ADFSAuthenticator: """Handles ADFS token generation using Kerberos SSO.""" def __init__(self, config: Config): """Initialize ADFS authenticator. Args: config: Application configuration """ self.config = config self.adfs_url = config.adfs_url if hasattr(config, 'adfs_url') else None self.jira_url = config.jira_url self._token: Optional[str] = None self._kerberos_auth: Optional[HTTPKerberosAuth] = None def _get_kerberos_auth(self) -> HTTPKerberosAuth: """Get Kerberos authentication handler. Returns: HTTPKerberosAuth instance """ if self._kerberos_auth is None: mutual_auth = REQUIRED if self.config.kerberos_mutual_auth else OPTIONAL self._kerberos_auth = HTTPKerberosAuth( mutual_authentication=mutual_auth, sanitize_mutual_error_response=False, ) return self._kerberos_auth def get_adfs_token(self) -> Optional[str]: """Get ADFS token using Kerberos credentials from ccache. This method uses the Kerberos credential cache (KRB5CCNAME) to authenticate with ADFS and retrieve a SAML or OAuth token. Returns: ADFS token string or None if unable to obtain Raises: RuntimeError: If token acquisition fails """ if self._token: return self._token try: # Method 1: Direct Kerberos authentication to JIRA # If JIRA supports Kerberos/SPNEGO directly, use that logger.info("Attempting direct Kerberos authentication to JIRA") kerberos_auth = self._get_kerberos_auth() # Test authentication by making a simple request response = requests.get( f"{self.jira_url}/rest/api/3/myself", auth=kerberos_auth, timeout=self.config.request_timeout, ) if response.status_code == 200: logger.info("Successfully authenticated with Kerberos/SPNEGO") # For direct Kerberos auth, we don't need a separate token # The HTTPKerberosAuth handler will manage the authentication self._token = "KERBEROS_SSO" return self._token # Method 2: ADFS SAML token acquisition (if ADFS URL is configured) if self.adfs_url: logger.info(f"Attempting ADFS token acquisition from {self.adfs_url}") return self._get_saml_token_from_adfs() raise RuntimeError( f"Failed to authenticate. Response: {response.status_code}" ) except requests.exceptions.RequestException as e: logger.error(f"Failed to obtain ADFS token: {e}") raise RuntimeError(f"ADFS token acquisition failed: {e}") from e def _get_saml_token_from_adfs(self) -> Optional[str]: """Get SAML token from ADFS endpoint. Returns: SAML token string or None """ try: kerberos_auth = self._get_kerberos_auth() # ADFS WS-Trust endpoint (adjust based on your ADFS configuration) wstrust_endpoint = f"{self.adfs_url}/adfs/services/trust/13/windowstransport" # WS-Trust request envelope request_envelope = f"""<?xml version="1.0" encoding="UTF-8"?> <s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope" xmlns:a="http://www.w3.org/2005/08/addressing" xmlns:u="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd"> <s:Header> <a:Action s:mustUnderstand="1">http://docs.oasis-open.org/ws-sx/ws-trust/200512/RST/Issue</a:Action> <a:To s:mustUnderstand="1">{wstrust_endpoint}</a:To> </s:Header> <s:Body> <trust:RequestSecurityToken xmlns:trust="http://docs.oasis-open.org/ws-sx/ws-trust/200512"> <wsp:AppliesTo xmlns:wsp="http://schemas.xmlsoap.org/ws/2004/09/policy"> <a:EndpointReference> <a:Address>{self.jira_url}</a:Address> </a:EndpointReference> </wsp:AppliesTo> <trust:RequestType>http://docs.oasis-open.org/ws-sx/ws-trust/200512/Issue</trust:RequestType> </trust:RequestSecurityToken> </s:Body> </s:Envelope>""" response = requests.post( wstrust_endpoint, data=request_envelope, auth=kerberos_auth, headers={"Content-Type": "application/soap+xml; charset=utf-8"}, timeout=self.config.request_timeout, ) if response.status_code == 200: # Parse SAML token from response token = self._extract_saml_token(response.text) if token: logger.info("Successfully obtained SAML token from ADFS") self._token = token return token logger.warning(f"ADFS token request failed: {response.status_code}") return None except Exception as e: logger.error(f"Error getting SAML token from ADFS: {e}") return None def _extract_saml_token(self, soap_response: str) -> Optional[str]: """Extract SAML assertion from SOAP response. Args: soap_response: SOAP response XML Returns: SAML token string or None """ try: # Parse the SOAP response root = ET.fromstring(soap_response) # Find SAML assertion (namespace-aware) namespaces = { 's': 'http://www.w3.org/2003/05/soap-envelope', 'trust': 'http://docs.oasis-open.org/ws-sx/ws-trust/200512', 'saml': 'urn:oasis:names:tc:SAML:2.0:assertion', } # Look for RequestedSecurityToken token_element = root.find('.//trust:RequestedSecurityToken', namespaces) if token_element is not None: # Get the SAML assertion saml_assertion = token_element.find('.//saml:Assertion', namespaces) if saml_assertion is not None: return ET.tostring(saml_assertion, encoding='unicode') return None except ET.ParseError as e: logger.error(f"Failed to parse SAML response: {e}") return None def get_auth_header(self) -> dict: """Get authentication header for JIRA requests. Returns: Dictionary with authentication headers """ token = self.get_adfs_token() if token == "KERBEROS_SSO": # For direct Kerberos auth, return empty dict # The requests library will use the auth parameter instead return {} elif token: # For SAML token, use Bearer authentication return {"Authorization": f"Bearer {token}"} return {} def get_requests_auth(self) -> Optional[HTTPKerberosAuth]: """Get requests auth object for Kerberos authentication. Returns: HTTPKerberosAuth instance or None """ if self._token == "KERBEROS_SSO": return self._get_kerberos_auth() return None def clear_token(self) -> None: """Clear cached token to force refresh.""" self._token = None logger.info("ADFS token cache cleared")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tarangbhavsar/mcp-jira-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server