"""Kerberos authentication using GSSAPI."""
import logging
import os
import subprocess
from typing import Optional
from requests_gssapi import HTTPKerberosAuth, OPTIONAL, REQUIRED
from mcp_jira.config import Config
logger = logging.getLogger(__name__)
class KerberosAuthenticator:
"""Handles Kerberos/GSSAPI authentication for JIRA."""
def __init__(self, config: Config):
"""Initialize Kerberos authenticator.
Args:
config: Application configuration
"""
self.config = config
self.principal = config.kerberos_principal
self.keytab_path = config.kerberos_keytab_path
self.mutual_auth = REQUIRED if config.kerberos_mutual_auth else OPTIONAL
self._auth: Optional[HTTPKerberosAuth] = None
def _init_keytab(self) -> None:
"""Initialize Kerberos using keytab file if provided."""
if not self.keytab_path:
logger.info("No keytab path provided, assuming kinit has been run")
return
if not os.path.exists(self.keytab_path):
raise FileNotFoundError(f"Keytab file not found: {self.keytab_path}")
try:
# Use kinit with keytab to get initial ticket
cmd = ["kinit", "-kt", self.keytab_path]
if self.principal:
cmd.append(self.principal)
logger.info(f"Initializing Kerberos ticket using keytab: {self.keytab_path}")
result = subprocess.run(
cmd, capture_output=True, text=True, check=True, timeout=10
)
if result.returncode != 0:
raise RuntimeError(
f"Failed to initialize Kerberos ticket: {result.stderr}"
)
logger.info("Successfully initialized Kerberos ticket")
except subprocess.CalledProcessError as e:
raise RuntimeError(
f"Failed to run kinit: {e.stderr if e.stderr else str(e)}"
) from e
except subprocess.TimeoutExpired as e:
raise RuntimeError("kinit command timed out") from e
def verify_ticket(self) -> bool:
"""Verify that a valid Kerberos ticket exists.
Returns:
True if valid ticket exists, False otherwise
"""
try:
result = subprocess.run(
["klist", "-s"], capture_output=True, timeout=5
)
return result.returncode == 0
except (subprocess.SubprocessError, FileNotFoundError):
logger.warning("Could not verify Kerberos ticket (klist not available)")
return False
def get_auth(self) -> HTTPKerberosAuth:
"""Get HTTPKerberosAuth instance for requests.
Returns:
HTTPKerberosAuth instance configured for this authenticator
Raises:
RuntimeError: If authentication setup fails
"""
if self._auth is None:
# Initialize keytab if provided
self._init_keytab()
# Verify we have a ticket (optional check)
if not self.verify_ticket():
logger.warning(
"No valid Kerberos ticket found. "
"Authentication may fail if ticket is required."
)
# Create auth handler
self._auth = HTTPKerberosAuth(
mutual_authentication=self.mutual_auth,
sanitize_mutual_error_response=False,
)
logger.info("Kerberos authentication configured successfully")
return self._auth
def refresh_ticket(self) -> None:
"""Refresh the Kerberos ticket if using keytab."""
if self.keytab_path:
logger.info("Refreshing Kerberos ticket")
self._init_keytab()
# Reset auth to force recreation
self._auth = None