Skip to main content
Glama

MCP Atlassian

by ArconixForge
oauth.py19.8 kB
"""OAuth 2.0 utilities for Atlassian Cloud authentication. This module provides utilities for OAuth 2.0 (3LO) authentication with Atlassian Cloud. It handles: - OAuth configuration - Token acquisition, storage, and refresh - Session configuration for API clients """ import json import logging import os import pprint import time import urllib.parse from dataclasses import dataclass from pathlib import Path from typing import Any, Optional import keyring import requests # Configure logging logger = logging.getLogger("mcp-atlassian.oauth") # Constants TOKEN_URL = "https://auth.atlassian.com/oauth/token" # noqa: S105 - This is a public API endpoint URL, not a password AUTHORIZE_URL = "https://auth.atlassian.com/authorize" CLOUD_ID_URL = "https://api.atlassian.com/oauth/token/accessible-resources" TOKEN_EXPIRY_MARGIN = 300 # 5 minutes in seconds KEYRING_SERVICE_NAME = "mcp-atlassian-oauth" @dataclass class OAuthConfig: """OAuth 2.0 configuration for Atlassian Cloud. This class manages the OAuth configuration and tokens. It handles: - Authentication configuration (client credentials) - Token acquisition and refreshing - Token storage and retrieval - Cloud ID identification """ client_id: str client_secret: str redirect_uri: str scope: str cloud_id: str | None = None refresh_token: str | None = None access_token: str | None = None expires_at: float | None = None @property def is_token_expired(self) -> bool: """Check if the access token is expired or will expire soon. Returns: True if the token is expired or will expire soon, False otherwise. """ # If we don't have a token or expiry time, consider it expired if not self.access_token or not self.expires_at: return True # Consider the token expired if it will expire within the margin return time.time() + TOKEN_EXPIRY_MARGIN >= self.expires_at def get_authorization_url(self, state: str) -> str: """Get the authorization URL for the OAuth 2.0 flow. Args: state: Random state string for CSRF protection Returns: The authorization URL to redirect the user to. """ params = { "audience": "api.atlassian.com", "client_id": self.client_id, "scope": self.scope, "redirect_uri": self.redirect_uri, "response_type": "code", "prompt": "consent", "state": state, } return f"{AUTHORIZE_URL}?{urllib.parse.urlencode(params)}" def exchange_code_for_tokens(self, code: str) -> bool: """Exchange the authorization code for access and refresh tokens. Args: code: The authorization code from the callback Returns: True if tokens were successfully acquired, False otherwise. """ try: payload = { "grant_type": "authorization_code", "client_id": self.client_id, "client_secret": self.client_secret, "code": code, "redirect_uri": self.redirect_uri, } logger.info(f"Exchanging authorization code for tokens at {TOKEN_URL}") logger.debug(f"Token exchange payload: {pprint.pformat(payload)}") response = requests.post(TOKEN_URL, data=payload) # Log more details about the response logger.debug(f"Token exchange response status: {response.status_code}") logger.debug( f"Token exchange response headers: {pprint.pformat(response.headers)}" ) logger.debug(f"Token exchange response body: {response.text[:500]}...") if not response.ok: logger.error( f"Token exchange failed with status {response.status_code}. Response: {response.text}" ) return False # Parse the response token_data = response.json() # Check if required tokens are present if "access_token" not in token_data: logger.error( f"Access token not found in response. Keys found: {list(token_data.keys())}" ) return False if "refresh_token" not in token_data: logger.error( "Refresh token not found in response. Ensure 'offline_access' scope is included. " f"Keys found: {list(token_data.keys())}" ) return False self.access_token = token_data["access_token"] self.refresh_token = token_data["refresh_token"] self.expires_at = time.time() + token_data["expires_in"] # Get the cloud ID using the access token self._get_cloud_id() # Save the tokens self._save_tokens() # Log success message with token details logger.info( f"✅ OAuth token exchange successful! Access token expires in {token_data['expires_in']}s." ) logger.info( f"Access Token (partial): {self.access_token[:10]}...{self.access_token[-5:] if self.access_token else ''}" ) logger.info( f"Refresh Token (partial): {self.refresh_token[:5]}...{self.refresh_token[-3:] if self.refresh_token else ''}" ) if self.cloud_id: logger.info(f"Cloud ID successfully retrieved: {self.cloud_id}") else: logger.warning( "Cloud ID was not retrieved after token exchange. Check accessible resources." ) return True except requests.exceptions.RequestException as e: logger.error(f"Network error during token exchange: {e}", exc_info=True) return False except json.JSONDecodeError as e: logger.error( f"Failed to decode JSON response from token endpoint: {e}", exc_info=True, ) logger.error( f"Response text that failed to parse: {response.text if 'response' in locals() else 'Response object not available'}" ) return False except Exception as e: logger.error(f"Failed to exchange code for tokens: {e}") return False def refresh_access_token(self) -> bool: """Refresh the access token using the refresh token. Returns: True if the token was successfully refreshed, False otherwise. """ if not self.refresh_token: logger.error("No refresh token available") return False try: payload = { "grant_type": "refresh_token", "client_id": self.client_id, "client_secret": self.client_secret, "refresh_token": self.refresh_token, } logger.debug("Refreshing access token...") response = requests.post(TOKEN_URL, data=payload) response.raise_for_status() # Parse the response token_data = response.json() self.access_token = token_data["access_token"] # Refresh token might also be rotated if "refresh_token" in token_data: self.refresh_token = token_data["refresh_token"] self.expires_at = time.time() + token_data["expires_in"] # Save the tokens self._save_tokens() return True except Exception as e: logger.error(f"Failed to refresh access token: {e}") return False def ensure_valid_token(self) -> bool: """Ensure the access token is valid, refreshing if necessary. Returns: True if the token is valid (or was refreshed successfully), False otherwise. """ if not self.is_token_expired: return True return self.refresh_access_token() def _get_cloud_id(self) -> None: """Get the cloud ID for the Atlassian instance. This method queries the accessible resources endpoint to get the cloud ID. The cloud ID is needed for API calls with OAuth. """ if not self.access_token: logger.debug("No access token available to get cloud ID") return try: headers = {"Authorization": f"Bearer {self.access_token}"} response = requests.get(CLOUD_ID_URL, headers=headers) response.raise_for_status() resources = response.json() if resources and len(resources) > 0: # Use the first cloud site (most users have only one) # For users with multiple sites, they might need to specify which one to use self.cloud_id = resources[0]["id"] logger.debug(f"Found cloud ID: {self.cloud_id}") else: logger.warning("No Atlassian sites found in the response") except Exception as e: logger.error(f"Failed to get cloud ID: {e}") def _get_keyring_username(self) -> str: """Get the keyring username for storing tokens. The username is based on the client ID to allow multiple OAuth apps. Returns: A username string for keyring """ return f"oauth-{self.client_id}" def _save_tokens(self) -> None: """Save the tokens securely using keyring for later use. This allows the tokens to be reused between runs without requiring the user to go through the authorization flow again. """ try: username = self._get_keyring_username() # Store token data as JSON string in keyring token_data = { "refresh_token": self.refresh_token, "access_token": self.access_token, "expires_at": self.expires_at, "cloud_id": self.cloud_id, } # Store the token data in the system keyring keyring.set_password(KEYRING_SERVICE_NAME, username, json.dumps(token_data)) logger.debug(f"Saved OAuth tokens to keyring for {username}") # Also maintain backwards compatibility with file storage # for environments where keyring might not work self._save_tokens_to_file(token_data) except Exception as e: logger.error(f"Failed to save tokens to keyring: {e}") # Fall back to file storage if keyring fails self._save_tokens_to_file() def _save_tokens_to_file(self, token_data: dict = None) -> None: """Save the tokens to a file as fallback storage. Args: token_data: Optional dict with token data. If not provided, will use the current object attributes. """ try: # Create the directory if it doesn't exist token_dir = Path.home() / ".mcp-atlassian" token_dir.mkdir(exist_ok=True) # Save the tokens to a file token_path = token_dir / f"oauth-{self.client_id}.json" if token_data is None: token_data = { "refresh_token": self.refresh_token, "access_token": self.access_token, "expires_at": self.expires_at, "cloud_id": self.cloud_id, } with open(token_path, "w") as f: json.dump(token_data, f) logger.debug(f"Saved OAuth tokens to file {token_path} (fallback storage)") except Exception as e: logger.error(f"Failed to save tokens to file: {e}") @staticmethod def load_tokens(client_id: str) -> dict[str, Any]: """Load tokens securely from keyring. Args: client_id: The OAuth client ID Returns: Dict with the token data or empty dict if no tokens found """ username = f"oauth-{client_id}" # Try to load tokens from keyring first try: token_json = keyring.get_password(KEYRING_SERVICE_NAME, username) if token_json: logger.debug(f"Loaded OAuth tokens from keyring for {username}") return json.loads(token_json) except Exception as e: logger.warning( f"Failed to load tokens from keyring: {e}. Trying file fallback." ) # Fall back to loading from file if keyring fails or returns None return OAuthConfig._load_tokens_from_file(client_id) @staticmethod def _load_tokens_from_file(client_id: str) -> dict[str, Any]: """Load tokens from a file as fallback. Args: client_id: The OAuth client ID Returns: Dict with the token data or empty dict if no tokens found """ token_path = Path.home() / ".mcp-atlassian" / f"oauth-{client_id}.json" if not token_path.exists(): return {} try: with open(token_path) as f: token_data = json.load(f) logger.debug( f"Loaded OAuth tokens from file {token_path} (fallback storage)" ) return token_data except Exception as e: logger.error(f"Failed to load tokens from file: {e}") return {} @classmethod def from_env(cls) -> Optional["OAuthConfig"]: """Create an OAuth configuration from environment variables. Returns: OAuthConfig instance or None if OAuth is not enabled """ # Check if OAuth is explicitly enabled (allows minimal config) oauth_enabled = os.getenv("ATLASSIAN_OAUTH_ENABLE", "").lower() in ( "true", "1", "yes", ) # Check for required environment variables client_id = os.getenv("ATLASSIAN_OAUTH_CLIENT_ID") client_secret = os.getenv("ATLASSIAN_OAUTH_CLIENT_SECRET") redirect_uri = os.getenv("ATLASSIAN_OAUTH_REDIRECT_URI") scope = os.getenv("ATLASSIAN_OAUTH_SCOPE") # Full OAuth configuration (traditional mode) if all([client_id, client_secret, redirect_uri, scope]): # Create the OAuth configuration with full credentials config = cls( client_id=client_id, client_secret=client_secret, redirect_uri=redirect_uri, scope=scope, cloud_id=os.getenv("ATLASSIAN_OAUTH_CLOUD_ID"), ) # Try to load existing tokens token_data = cls.load_tokens(client_id) if token_data: config.refresh_token = token_data.get("refresh_token") config.access_token = token_data.get("access_token") config.expires_at = token_data.get("expires_at") if not config.cloud_id and "cloud_id" in token_data: config.cloud_id = token_data["cloud_id"] return config # Minimal OAuth configuration (user-provided tokens mode) elif oauth_enabled: # Create minimal config that works with user-provided tokens logger.info( "Creating minimal OAuth config for user-provided tokens (ATLASSIAN_OAUTH_ENABLE=true)" ) return cls( client_id="", # Will be provided by user tokens client_secret="", # Not needed for user tokens redirect_uri="", # Not needed for user tokens scope="", # Will be determined by user token permissions cloud_id=os.getenv("ATLASSIAN_OAUTH_CLOUD_ID"), # Optional fallback ) # No OAuth configuration return None @dataclass class BYOAccessTokenOAuthConfig: """OAuth configuration when providing a pre-existing access token. This class is used when the user provides their own Atlassian Cloud ID and access token directly, bypassing the full OAuth 2.0 (3LO) flow. It's suitable for scenarios like service accounts or CI/CD pipelines where an access token is already available. This configuration does not support token refreshing. """ cloud_id: str access_token: str refresh_token: None = None expires_at: None = None @classmethod def from_env(cls) -> Optional["BYOAccessTokenOAuthConfig"]: """Create a BYOAccessTokenOAuthConfig from environment variables. Reads `ATLASSIAN_OAUTH_CLOUD_ID` and `ATLASSIAN_OAUTH_ACCESS_TOKEN`. Returns: BYOAccessTokenOAuthConfig instance or None if required environment variables are missing. """ cloud_id = os.getenv("ATLASSIAN_OAUTH_CLOUD_ID") access_token = os.getenv("ATLASSIAN_OAUTH_ACCESS_TOKEN") if not all([cloud_id, access_token]): return None return cls(cloud_id=cloud_id, access_token=access_token) def get_oauth_config_from_env() -> OAuthConfig | BYOAccessTokenOAuthConfig | None: """Get the appropriate OAuth configuration from environment variables. This function attempts to load standard OAuth configuration first (OAuthConfig). If that's not available, it tries to load a "Bring Your Own Access Token" configuration (BYOAccessTokenOAuthConfig). Returns: An instance of OAuthConfig or BYOAccessTokenOAuthConfig if environment variables are set for either, otherwise None. """ return BYOAccessTokenOAuthConfig.from_env() or OAuthConfig.from_env() def configure_oauth_session( session: requests.Session, oauth_config: OAuthConfig | BYOAccessTokenOAuthConfig ) -> bool: """Configure a requests session with OAuth 2.0 authentication. This function ensures the access token is valid and adds it to the session headers. Args: session: The requests session to configure oauth_config: The OAuth configuration to use Returns: True if the session was successfully configured, False otherwise """ logger.debug( f"configure_oauth_session: Received OAuthConfig with " f"access_token_present={bool(oauth_config.access_token)}, " f"refresh_token_present={bool(oauth_config.refresh_token)}, " f"cloud_id='{oauth_config.cloud_id}'" ) # If user provided only an access token (no refresh_token), use it directly if oauth_config.access_token and not oauth_config.refresh_token: logger.info( "configure_oauth_session: Using provided OAuth access token directly (no refresh_token)." ) session.headers["Authorization"] = f"Bearer {oauth_config.access_token}" return True logger.debug("configure_oauth_session: Proceeding to ensure_valid_token.") # Otherwise, ensure we have a valid token (refresh if needed) if isinstance(oauth_config, BYOAccessTokenOAuthConfig): logger.error( "configure_oauth_session: oauth access token configuration provided as empty string." ) return False if not oauth_config.ensure_valid_token(): logger.error( f"configure_oauth_session: ensure_valid_token returned False. " f"Token was expired: {oauth_config.is_token_expired}, " f"Refresh token present for attempt: {bool(oauth_config.refresh_token)}" ) return False session.headers["Authorization"] = f"Bearer {oauth_config.access_token}" logger.info("Successfully configured OAuth session for Atlassian Cloud API") return True

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ArconixForge/mcp-atlassian'

If you have feedback or need assistance with the MCP directory API, please join our Discord server