Skip to main content
Glama

MCP Atlassian

#!/usr/bin/env python """ OAuth 2.0 Authorization Flow Helper for MCP Atlassian This script helps with the OAuth 2.0 (3LO) authorization flow for Atlassian Cloud: 1. Opens a browser to the authorization URL 2. Starts a local server to receive the callback with the authorization code 3. Exchanges the authorization code for access and refresh tokens 4. Saves the tokens for later use by MCP Atlassian Usage: python oauth_authorize.py --client-id YOUR_CLIENT_ID --client-secret YOUR_CLIENT_SECRET --redirect-uri http://localhost:8080/callback --scope "read:jira-work write:jira-work read:confluence-space.summary offline_access" IMPORTANT: The 'offline_access' scope is required for refresh tokens to work properly. Without this scope, tokens will expire quickly and authentication will fail. Environment variables can also be used: - ATLASSIAN_OAUTH_CLIENT_ID - ATLASSIAN_OAUTH_CLIENT_SECRET - ATLASSIAN_OAUTH_REDIRECT_URI - ATLASSIAN_OAUTH_SCOPE """ import argparse import http.server import logging import os import secrets import socketserver import sys import threading import time import urllib.parse import webbrowser # Add the parent directory to the path so we can import the package sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.mcp_atlassian.utils.oauth import OAuthConfig # Configure logging (basicConfig should be called only once, ideally at the very start) # Adding lineno for better debugging. logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(name)s - %(lineno)d - %(message)s", force=True, ) logger = logging.getLogger("oauth-authorize") logger.setLevel(logging.DEBUG) logging.getLogger("mcp-atlassian.oauth").setLevel(logging.DEBUG) # Global variables for callback handling authorization_code = None received_state = None callback_received = False callback_error = None class CallbackHandler(http.server.BaseHTTPRequestHandler): """HTTP request handler for OAuth callback.""" def do_GET(self) -> None: # noqa: N802 """Handle GET requests (OAuth callback and favicon).""" global authorization_code, callback_received, callback_error, received_state parsed_path = urllib.parse.urlparse(self.path) logger.debug(f"CallbackHandler received GET request for: {self.path}") # Ignore favicon requests politely if parsed_path.path == "/favicon.ico": self.send_error(404, "File not found") logger.debug("CallbackHandler: Ignored /favicon.ico request.") return # Process only /callback path if parsed_path.path != "/callback": self.send_error(404, "Not Found: Only /callback is supported.") logger.warning( f"CallbackHandler: Received request for unexpected path: {parsed_path.path}" ) return # Parse the query parameters from the URL query = parsed_path.query params = urllib.parse.parse_qs(query) if "error" in params: callback_error = params["error"][0] callback_received = True logger.error(f"Authorization error from callback: {callback_error}") self._send_response(f"Authorization failed: {callback_error}") return if "code" in params: authorization_code = params["code"][0] if "state" in params: received_state = params["state"][0] callback_received = True logger.info( "Authorization code and state received successfully via callback." ) self._send_response( "Authorization successful! You can close this window now." ) else: logger.error("Invalid callback: 'code' or 'error' parameter missing.") self._send_response( "Invalid callback: Authorization code missing", status=400 ) def _send_response(self, message: str, status: int = 200) -> None: """Send response to the browser.""" self.send_response(status) self.send_header("Content-type", "text/html") self.end_headers() html = f""" <!DOCTYPE html> <html> <head> <title>Atlassian OAuth Authorization</title> <style> body {{ font-family: Arial, sans-serif; text-align: center; padding: 40px; max-width: 600px; margin: 0 auto; }} .message {{ padding: 20px; border-radius: 5px; margin-bottom: 20px; }} .success {{ background-color: #d4edda; color: #155724; border: 1px solid #c3e6cb; }} .error {{ background-color: #f8d7da; color: #721c24; border: 1px solid #f5c6cb; }} </style> </head> <body> <h1>Atlassian OAuth Authorization</h1> <div class="message {"success" if status == 200 else "error"}"> <p>{message}</p> </div> <p>This window will automatically close in 5 seconds...</p> <script> setTimeout(function() {{ window.close(); }}, 5000); </script> </body> </html> """ self.wfile.write(html.encode()) # Make the server quiet def log_message(self, format: str, *args: str) -> None: return def start_callback_server(port: int) -> socketserver.TCPServer: """Start a local server to receive the OAuth callback.""" handler = CallbackHandler httpd = socketserver.TCPServer(("", port), handler) server_thread = threading.Thread(target=httpd.serve_forever) server_thread.daemon = True server_thread.start() return httpd def wait_for_callback(timeout: int = 300) -> bool: """Wait for the callback to be received.""" start_time = time.time() while not callback_received and (time.time() - start_time) < timeout: time.sleep(1) if not callback_received: logger.error( f"Timed out waiting for authorization callback after {timeout} seconds" ) return False if callback_error: logger.error(f"Authorization error: {callback_error}") return False return True def parse_redirect_uri(redirect_uri: str) -> tuple[str, int]: """Parse the redirect URI to extract host and port.""" parsed = urllib.parse.urlparse(redirect_uri) port = parsed.port or (443 if parsed.scheme == "https" else 80) return parsed.hostname, port def run_oauth_flow(args: argparse.Namespace) -> bool: """Run the OAuth 2.0 authorization flow.""" # Create OAuth configuration oauth_config = OAuthConfig( client_id=args.client_id, client_secret=args.client_secret, redirect_uri=args.redirect_uri, scope=args.scope, ) # Generate a random state for CSRF protection state = secrets.token_urlsafe(16) # Start local callback server if using localhost hostname, port = parse_redirect_uri(args.redirect_uri) httpd = None if hostname and hostname.lower() in ["localhost", "127.0.0.1"]: logger.info(f"Attempting to start local callback server on {hostname}:{port}") try: httpd = start_callback_server(port) except OSError as e: logger.error(f"Failed to start callback server: {e}") logger.error(f"Make sure port {port} is available and not in use") return False # Get the authorization URL auth_url = oauth_config.get_authorization_url(state=state) # Open the browser for authorization logger.info(f"Opening browser for authorization at {auth_url}") webbrowser.open(auth_url) logger.info( "If the browser doesn't open automatically, please visit this URL manually." ) # Wait for the callback if not wait_for_callback(): if httpd: httpd.shutdown() return False # Verify state to prevent CSRF attacks if received_state != state: logger.error( f"State mismatch! Possible CSRF attack. Expected: {state}, Received: {received_state}" ) if httpd: httpd.shutdown() return False logger.info("CSRF state verified successfully.") # Exchange the code for tokens logger.info("Exchanging authorization code for tokens...") if not authorization_code: logger.error("Authorization code is missing, cannot exchange for tokens.") if httpd: httpd.shutdown() return False if oauth_config.exchange_code_for_tokens(authorization_code): logger.info("🎉 OAuth authorization flow completed successfully!") if oauth_config.cloud_id: logger.info(f"Retrieved Cloud ID: {oauth_config.cloud_id}") logger.info( "\n💡 Tip: Add/update the following in your .env file or environment variables:" ) logger.info(f"ATLASSIAN_OAUTH_CLIENT_ID={oauth_config.client_id}") logger.info(f"ATLASSIAN_OAUTH_CLIENT_SECRET={oauth_config.client_secret}") logger.info(f"ATLASSIAN_OAUTH_REDIRECT_URI={oauth_config.redirect_uri}") logger.info(f"ATLASSIAN_OAUTH_SCOPE={oauth_config.scope}") logger.info(f"ATLASSIAN_OAUTH_CLOUD_ID={oauth_config.cloud_id}") else: logger.warning( "Cloud ID could not be obtained. Some API calls might require it." ) if httpd: httpd.shutdown() return True else: logger.error("Failed to exchange authorization code for tokens") if httpd: httpd.shutdown() return False def main() -> int: """Main entry point.""" parser = argparse.ArgumentParser( description="OAuth 2.0 Authorization Flow Helper for MCP Atlassian" ) parser.add_argument("--client-id", help="OAuth Client ID") parser.add_argument("--client-secret", help="OAuth Client Secret") parser.add_argument( "--redirect-uri", help="OAuth Redirect URI (e.g., http://localhost:8080/callback)", ) parser.add_argument("--scope", help="OAuth Scope (space-separated)") args = parser.parse_args() # Check for environment variables if arguments are not provided if not args.client_id: args.client_id = os.getenv("ATLASSIAN_OAUTH_CLIENT_ID") if not args.client_secret: args.client_secret = os.getenv("ATLASSIAN_OAUTH_CLIENT_SECRET") if not args.redirect_uri: args.redirect_uri = os.getenv("ATLASSIAN_OAUTH_REDIRECT_URI") if not args.scope: args.scope = os.getenv("ATLASSIAN_OAUTH_SCOPE") # Validate required arguments missing = [] if not args.client_id: missing.append("client-id") if not args.client_secret: missing.append("client-secret") if not args.redirect_uri: missing.append("redirect-uri") if not args.scope: missing.append("scope") if missing: logger.error(f"Missing required arguments: {', '.join(missing)}") parser.print_help() return 1 # Check for offline_access scope if args.scope and "offline_access" not in args.scope.split(): logger.warning("\n⚠️ WARNING: The 'offline_access' scope is missing!") logger.warning( "Without this scope, refresh tokens will not be issued and authentication will fail when tokens expire." ) logger.warning("Consider adding 'offline_access' to your scope string.") proceed = input("Do you want to proceed anyway? (y/n): ") if proceed.lower() != "y": return 1 success = run_oauth_flow(args) return 0 if success else 1 if __name__ == "__main__": sys.exit(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sooperset/mcp-atlassian'

If you have feedback or need assistance with the MCP directory API, please join our Discord server