Skip to main content
Glama

IMAP MCP Server

by non-dirty
"""Browser-based OAuth2 authentication for Gmail.""" import base64 import json import logging import os import secrets import sys import time import webbrowser from pathlib import Path from typing import Dict, Optional, Tuple from urllib.parse import urlencode, urlparse, parse_qs import yaml from flask import Flask, request, redirect, url_for logger = logging.getLogger(__name__) # Gmail OAuth2 endpoints GMAIL_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth" GMAIL_TOKEN_URL = "https://oauth2.googleapis.com/token" GMAIL_SCOPES = ["https://mail.google.com/"] # Local server details DEFAULT_CALLBACK_PORT = 8080 DEFAULT_CALLBACK_HOST = "localhost" CALLBACK_PATH = "/oauth2callback" SUCCESS_PATH = "/success" # In-memory token storage auth_tokens = { "access_token": None, "refresh_token": None, "token_expiry": None, } def create_oauth_app() -> Flask: """Create the Flask app for OAuth2 callback handling.""" app = Flask(__name__) @app.route(CALLBACK_PATH) def oauth2callback(): # Get authorization code from query parameters code = request.args.get("code") if not code: return "Error: No authorization code received", 400 # Exchange code for tokens client_id = app.config.get("client_id") client_secret = app.config.get("client_secret") # Make token request try: import requests token_data = { "code": code, "client_id": client_id, "client_secret": client_secret, "redirect_uri": app.config.get("redirect_uri"), "grant_type": "authorization_code", } response = requests.post(GMAIL_TOKEN_URL, data=token_data) response.raise_for_status() # Raise exception for 4XX/5XX responses tokens = response.json() # Store tokens in memory auth_tokens["access_token"] = tokens.get("access_token") auth_tokens["refresh_token"] = tokens.get("refresh_token") auth_tokens["token_expiry"] = int(time.time()) + tokens.get("expires_in", 3600) logger.info("Successfully obtained OAuth2 tokens") return redirect(url_for("success")) except Exception as e: logger.error(f"Error exchanging authorization code: {e}") return f"Error: Failed to exchange authorization code: {e}", 500 @app.route(SUCCESS_PATH) def success(): """Success page shown after successful authentication.""" return """ <html> <head> <title>Authentication Successful</title> <style> body { font-family: Arial, sans-serif; line-height: 1.6; margin: 30px; max-width: 800px; margin: 0 auto; padding: 20px; } .success { background-color: #d4edda; color: #155724; padding: 15px; border-radius: 4px; margin: 20px 0; } </style> </head> <body> <h1>Authentication Successful!</h1> <div class="success"> <p>You have successfully authenticated with Gmail.</p> <p>You may now close this browser window and return to the application.</p> </div> </body> </html> """ return app def run_local_server( client_id: str, client_secret: str, port: int = DEFAULT_CALLBACK_PORT, host: str = DEFAULT_CALLBACK_HOST, ) -> Tuple[Optional[str], Optional[str], Optional[int]]: """Run a local server to handle the OAuth2 callback. Args: client_id: OAuth2 client ID client_secret: OAuth2 client secret port: Port for the local server host: Host for the local server Returns: Tuple of (access_token, refresh_token, expiry) or (None, None, None) if failed """ app = create_oauth_app() # Set up the redirect URI redirect_uri = f"http://{host}:{port}{CALLBACK_PATH}" # Store the client credentials in the app config app.config.update( client_id=client_id, client_secret=client_secret, redirect_uri=redirect_uri, ) # Set up the authorization URL state = secrets.token_urlsafe(16) # Generate a random state parameter auth_params = { "client_id": client_id, "redirect_uri": redirect_uri, "response_type": "code", "scope": " ".join(GMAIL_SCOPES), "access_type": "offline", "state": state, "prompt": "consent", # Force consent screen to get refresh token } auth_url = f"{GMAIL_AUTH_URL}?{urlencode(auth_params)}" # Clear any previous tokens auth_tokens["access_token"] = None auth_tokens["refresh_token"] = None auth_tokens["token_expiry"] = None print(f"\nOpening browser for Gmail authentication...") webbrowser.open(auth_url) print(f"\nWaiting for authentication at http://{host}:{port}{CALLBACK_PATH}") # Run the Flask app for a short period # We need to run it in a separate thread to avoid blocking import threading # Flag to signal when the server should stop server_should_stop = threading.Event() def run_server(): """Run the Flask server until stopped.""" # Create a custom server that can be stopped from werkzeug.serving import make_server server = make_server(host, port, app, threaded=True) server.timeout = 0.5 # Check for stop flag every 0.5 seconds while not server_should_stop.is_set(): server.handle_request() # Start the server thread server_thread = threading.Thread(target=run_server) server_thread.daemon = True server_thread.start() # Wait for authentication to complete (or timeout) try: # Wait for up to 5 minutes max_wait_time = 5 * 60 # 5 minutes in seconds start_time = time.time() while time.time() - start_time < max_wait_time: if auth_tokens["access_token"] is not None: # Authentication completed successfully break # Check every 1 second time.sleep(1) # Check if we timed out if auth_tokens["access_token"] is None: print("\nAuthentication timed out. Please try again.") return None, None, None finally: # Stop the server server_should_stop.set() server_thread.join(timeout=5) return ( auth_tokens["access_token"], auth_tokens["refresh_token"], auth_tokens["token_expiry"], ) def load_client_credentials(credentials_file: str) -> Tuple[str, str]: """ Load client credentials from the downloaded JSON file. Args: credentials_file: Path to the credentials JSON file Returns: Tuple of (client_id, client_secret) Raises: FileNotFoundError: If the credentials file doesn't exist ValueError: If the credentials file is invalid """ if not credentials_file: raise ValueError("No credentials file specified") credentials_path = Path(credentials_file) if not credentials_path.exists(): raise FileNotFoundError(f"Credentials file not found: {credentials_file}") try: with open(credentials_path) as f: credentials = json.load(f) if "installed" in credentials: client_config = credentials["installed"] elif "web" in credentials: client_config = credentials["web"] else: raise ValueError(f"Invalid credentials format in {credentials_file}") client_id = client_config.get("client_id") client_secret = client_config.get("client_secret") if not client_id or not client_secret: raise ValueError(f"Missing client_id or client_secret in {credentials_file}") return client_id, client_secret except json.JSONDecodeError: raise ValueError(f"Invalid JSON in credentials file: {credentials_file}") def perform_oauth_flow( client_id: Optional[str] = None, client_secret: Optional[str] = None, credentials_file: Optional[str] = None, port: int = DEFAULT_CALLBACK_PORT, config_path: Optional[str] = None, config_output: Optional[str] = None, ) -> Dict: """Run the OAuth flow to get Gmail access and refresh tokens. Args: client_id: OAuth2 client ID (optional, will prompt if not provided) client_secret: OAuth2 client secret (optional, will prompt if not provided) port: Port for the local server config_path: Path to existing config file to update (optional) config_output: Path to save the updated config file (optional) Returns: Updated configuration dictionary """ # Try to load credentials from file first if provided if credentials_file and not (client_id and client_secret): try: logger.info(f"Attempting to load credentials from {credentials_file}") loaded_client_id, loaded_client_secret = load_client_credentials(credentials_file) client_id = client_id or loaded_client_id client_secret = client_secret or loaded_client_secret logger.info("Successfully loaded credentials from file") except Exception as e: logger.warning(f"Failed to load credentials from file: {e}") # Use environment variables if not provided client_id = client_id or os.environ.get("GMAIL_CLIENT_ID") client_secret = client_secret or os.environ.get("GMAIL_CLIENT_SECRET") # Prompt for client_id and client_secret if not provided if not client_id: client_id = input("Enter your Google OAuth2 client ID: ").strip() if not client_secret: client_secret = input("Enter your Google OAuth2 client secret: ").strip() if not client_id or not client_secret: print("Error: Client ID and secret are required.") sys.exit(1) # Run the OAuth flow print("Starting OAuth2 authentication flow...") access_token, refresh_token, expiry = run_local_server( client_id=client_id, client_secret=client_secret, port=port, ) if not access_token or not refresh_token: print("Error: Failed to obtain OAuth2 tokens.") sys.exit(1) print("Authentication successful!") # Build OAuth2 configuration oauth2_data = { "client_id": client_id, "client_secret": client_secret, "refresh_token": refresh_token, "access_token": access_token, "token_expiry": expiry, } # Load existing config if specified config_data = {} if config_path: config_file = Path(config_path) if config_file.exists(): with open(config_file, "r") as f: config_data = yaml.safe_load(f) or {} logger.info(f"Loaded existing configuration from {config_path}") # Update config with OAuth2 data if "imap" not in config_data: config_data["imap"] = {} config_data["imap"]["oauth2"] = oauth2_data # Save updated config if output path specified if config_output: output_file = Path(config_output) output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, "w") as f: yaml.dump(config_data, f, default_flow_style=False) logger.info(f"Saved updated configuration to {config_output}") print("\nOAuth2 configuration:") print(json.dumps(oauth2_data, indent=2, default=str)) print("\nTo use these credentials, add them to your config.yaml file under the imap.oauth2 key.") print("Alternatively, you can set the following environment variables:") print(f" GMAIL_CLIENT_ID={client_id}") print(f" GMAIL_CLIENT_SECRET={client_secret}") print(f" GMAIL_REFRESH_TOKEN={refresh_token}") return config_data def main(): """Run the browser-based OAuth2 setup tool.""" import argparse parser = argparse.ArgumentParser(description="Browser-based OAuth2 authentication for Gmail") parser.add_argument( "--client-id", help="Google OAuth2 client ID", default=os.environ.get("GMAIL_CLIENT_ID"), ) parser.add_argument( "--client-secret", help="Google OAuth2 client secret", default=os.environ.get("GMAIL_CLIENT_SECRET"), ) parser.add_argument( "--port", type=int, help="Port for the local callback server", default=DEFAULT_CALLBACK_PORT, ) parser.add_argument( "--config", help="Path to existing config file to update", default=None, ) parser.add_argument( "--output", help="Path to save the updated config file", default="config.yaml", ) args = parser.parse_args() # Configure logging logging.basicConfig(level=logging.INFO) perform_oauth_flow( client_id=args.client_id, client_secret=args.client_secret, port=args.port, config_path=args.config, config_output=args.output, ) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/non-dirty/imap-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server