# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Roger Gujord
# https://github.com/gujord/OpenAPI-MCP
import os
import time
import logging
import httpx
from typing import Optional
try:
from .exceptions import AuthenticationError
except ImportError:
from exceptions import AuthenticationError
class OAuthTokenCache:
"""Manages OAuth token caching with automatic expiration."""
def __init__(self):
self._token: Optional[str] = None
self._expiry: float = 0
def get_token(self) -> Optional[str]:
"""Get cached token if still valid."""
if self._token and time.time() < self._expiry:
return self._token
return None
def set_token(self, token: str, expires_in: int = 3600) -> None:
"""Cache token with expiration time."""
self._token = token
self._expiry = time.time() + expires_in
def clear_token(self) -> None:
"""Clear cached token."""
self._token = None
self._expiry = 0
class UsernamePasswordAuthenticator:
"""Handles username/password authentication for API requests."""
def __init__(self, username: str, password: str, login_endpoint: str = None):
self._cache = OAuthTokenCache() # Reuse the token cache
self._username = username
self._password = password
self._login_endpoint = login_endpoint
def get_access_token(self) -> Optional[str]:
"""Get valid access token, refreshing if necessary."""
# Try cached token first
token = self._cache.get_token()
if token:
return token
if not self._login_endpoint:
logging.info("No login endpoint configured; cannot authenticate")
return None
return self._fetch_new_token()
def _fetch_new_token(self) -> str:
"""Fetch new token using username/password."""
try:
# Try form data first (common for OAuth2-style endpoints)
response = httpx.post(
self._login_endpoint,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"grant_type": "password",
"username": self._username,
"password": self._password,
"scope": ""
}
)
if response.status_code == 422:
# If form data fails, try JSON
response = httpx.post(
self._login_endpoint,
headers={"Content-Type": "application/json"},
json={
"username": self._username,
"password": self._password
}
)
response.raise_for_status()
token_data = response.json()
access_token = token_data.get("access_token")
if not access_token:
raise AuthenticationError("No access_token in login response")
expires_in = token_data.get("expires_in", 3600)
self._cache.set_token(access_token, expires_in)
logging.info("Login successful, token obtained")
return access_token
except httpx.HTTPStatusError as e:
raise AuthenticationError(f"Login failed: {e.response.status_code} {e.response.text}")
except Exception as e:
raise AuthenticationError(f"Failed to authenticate: {e}")
def add_auth_headers(self, headers: dict) -> dict:
"""Add authentication headers to request."""
token = self.get_access_token()
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
def is_configured(self) -> bool:
"""Check if username/password auth is properly configured."""
return bool(self._username and self._password and self._login_endpoint)
class OAuthAuthenticator:
"""Handles OAuth authentication for API requests."""
def __init__(self):
self._cache = OAuthTokenCache()
self._client_id = os.environ.get("OAUTH_CLIENT_ID")
self._client_secret = os.environ.get("OAUTH_CLIENT_SECRET")
self._token_url = os.environ.get("OAUTH_TOKEN_URL")
self._scope = os.environ.get("OAUTH_SCOPE", "api")
def get_access_token(self) -> Optional[str]:
"""Get valid access token, refreshing if necessary."""
# Try cached token first
token = self._cache.get_token()
if token:
return token
# If no OAuth credentials, return None (API may work without auth)
if not all([self._client_id, self._client_secret, self._token_url]):
logging.info("No OAuth credentials provided; proceeding without authentication")
return None
return self._fetch_new_token()
def _fetch_new_token(self) -> str:
"""Fetch new token from OAuth server."""
try:
response = httpx.post(
self._token_url,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"grant_type": "client_credentials",
"client_id": self._client_id,
"client_secret": self._client_secret,
"scope": self._scope
}
)
response.raise_for_status()
token_data = response.json()
access_token = token_data.get("access_token")
if not access_token:
raise AuthenticationError("No access_token in OAuth response")
expires_in = token_data.get("expires_in", 3600)
self._cache.set_token(access_token, expires_in)
logging.info("OAuth token obtained successfully")
return access_token
except httpx.HTTPStatusError as e:
raise AuthenticationError(f"OAuth token request failed: {e.response.status_code} {e.response.text}")
except Exception as e:
raise AuthenticationError(f"Failed to obtain OAuth token: {e}")
def add_auth_headers(self, headers: dict) -> dict:
"""Add authentication headers to request."""
token = self.get_access_token()
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
def is_configured(self) -> bool:
"""Check if OAuth is properly configured."""
return all([self._client_id, self._client_secret, self._token_url])
class AuthenticationManager:
"""Manages different authentication methods."""
def __init__(self, config):
self._oauth_auth = None
self._username_auth = None
# Initialize OAuth authenticator if configured
if config.is_oauth_configured():
self._oauth_auth = OAuthAuthenticator()
logging.info("OAuth authentication configured")
# Initialize username/password authenticator if configured
if config.is_username_auth_configured():
login_endpoint = config.login_endpoint
if not login_endpoint:
# Try to auto-detect login endpoint from common patterns
base_url = config.openapi_url.rsplit('/', 1)[0]
login_endpoint = f"{base_url}/auth/token"
logging.info(f"Auto-detected login endpoint: {login_endpoint}")
self._username_auth = UsernamePasswordAuthenticator(
config.username,
config.password,
login_endpoint
)
logging.info("Username/password authentication configured")
def get_access_token(self) -> Optional[str]:
"""Get access token using the configured authentication method."""
# Try username/password auth first if configured
if self._username_auth and self._username_auth.is_configured():
try:
return self._username_auth.get_access_token()
except AuthenticationError as e:
logging.warning(f"Username/password authentication failed: {e}")
# Fall back to OAuth if configured
if self._oauth_auth and self._oauth_auth.is_configured():
try:
return self._oauth_auth.get_access_token()
except AuthenticationError as e:
logging.warning(f"OAuth authentication failed: {e}")
logging.info("No authentication configured or all methods failed")
return None
def add_auth_headers(self, headers: dict) -> dict:
"""Add authentication headers to request."""
token = self.get_access_token()
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
def is_configured(self) -> bool:
"""Check if any authentication method is configured."""
return (
(self._oauth_auth and self._oauth_auth.is_configured()) or
(self._username_auth and self._username_auth.is_configured())
)