Skip to main content
Glama
kerberos_auth.py3.86 kB
"""Kerberos authentication using GSSAPI.""" import logging import os import subprocess from typing import Optional from requests_gssapi import HTTPKerberosAuth, OPTIONAL, REQUIRED from mcp_jira.config import Config logger = logging.getLogger(__name__) class KerberosAuthenticator: """Handles Kerberos/GSSAPI authentication for JIRA.""" def __init__(self, config: Config): """Initialize Kerberos authenticator. Args: config: Application configuration """ self.config = config self.principal = config.kerberos_principal self.keytab_path = config.kerberos_keytab_path self.mutual_auth = REQUIRED if config.kerberos_mutual_auth else OPTIONAL self._auth: Optional[HTTPKerberosAuth] = None def _init_keytab(self) -> None: """Initialize Kerberos using keytab file if provided.""" if not self.keytab_path: logger.info("No keytab path provided, assuming kinit has been run") return if not os.path.exists(self.keytab_path): raise FileNotFoundError(f"Keytab file not found: {self.keytab_path}") try: # Use kinit with keytab to get initial ticket cmd = ["kinit", "-kt", self.keytab_path] if self.principal: cmd.append(self.principal) logger.info(f"Initializing Kerberos ticket using keytab: {self.keytab_path}") result = subprocess.run( cmd, capture_output=True, text=True, check=True, timeout=10 ) if result.returncode != 0: raise RuntimeError( f"Failed to initialize Kerberos ticket: {result.stderr}" ) logger.info("Successfully initialized Kerberos ticket") except subprocess.CalledProcessError as e: raise RuntimeError( f"Failed to run kinit: {e.stderr if e.stderr else str(e)}" ) from e except subprocess.TimeoutExpired as e: raise RuntimeError("kinit command timed out") from e def verify_ticket(self) -> bool: """Verify that a valid Kerberos ticket exists. Returns: True if valid ticket exists, False otherwise """ try: result = subprocess.run( ["klist", "-s"], capture_output=True, timeout=5 ) return result.returncode == 0 except (subprocess.SubprocessError, FileNotFoundError): logger.warning("Could not verify Kerberos ticket (klist not available)") return False def get_auth(self) -> HTTPKerberosAuth: """Get HTTPKerberosAuth instance for requests. Returns: HTTPKerberosAuth instance configured for this authenticator Raises: RuntimeError: If authentication setup fails """ if self._auth is None: # Initialize keytab if provided self._init_keytab() # Verify we have a ticket (optional check) if not self.verify_ticket(): logger.warning( "No valid Kerberos ticket found. " "Authentication may fail if ticket is required." ) # Create auth handler self._auth = HTTPKerberosAuth( mutual_authentication=self.mutual_auth, sanitize_mutual_error_response=False, ) logger.info("Kerberos authentication configured successfully") return self._auth def refresh_ticket(self) -> None: """Refresh the Kerberos ticket if using keytab.""" if self.keytab_path: logger.info("Refreshing Kerberos ticket") self._init_keytab() # Reset auth to force recreation self._auth = None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tarangbhavsar/mcp-jira-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server