Skip to main content
Glama
auth_manager.py7.43 kB
"""Authentication manager for Gmail OAuth2 with multi-user support.""" import json import os from pathlib import Path from typing import Optional, List, Dict, Any from cryptography.fernet import Fernet from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow SCOPES = [ "https://www.googleapis.com/auth/gmail.send", "https://www.googleapis.com/auth/gmail.modify", ] class AuthManager: """Manages OAuth2 authentication for multiple Gmail users.""" def __init__(self, config_dir: Optional[Path] = None): """Initialize auth manager with configuration directory.""" self.config_dir = config_dir or Path.home() / ".gmail-mcp" self.config_dir.mkdir(exist_ok=True) self.tokens_dir = self.config_dir / "tokens" self.tokens_dir.mkdir(exist_ok=True) self.credentials_file = self.config_dir / "credentials.json" self.current_user_file = self.config_dir / "current_user.json" self.key_file = self.config_dir / ".key" self._ensure_encryption_key() def _ensure_encryption_key(self) -> None: """Ensure encryption key exists for token storage.""" if not self.key_file.exists(): key = Fernet.generate_key() self.key_file.write_bytes(key) self.key_file.chmod(0o600) # Owner read/write only def _get_cipher(self) -> Fernet: """Get encryption cipher for token storage.""" key = self.key_file.read_bytes() return Fernet(key) def _encrypt_data(self, data: Dict[str, Any]) -> bytes: """Encrypt data for secure storage.""" cipher = self._get_cipher() json_data = json.dumps(data).encode() return cipher.encrypt(json_data) def _decrypt_data(self, encrypted_data: bytes) -> Dict[str, Any]: """Decrypt data from secure storage.""" cipher = self._get_cipher() json_data = cipher.decrypt(encrypted_data) return json.loads(json_data.decode()) def set_credentials_file(self, credentials_path: str) -> None: """Set the OAuth2 client credentials file path.""" if not os.path.exists(credentials_path): raise FileNotFoundError(f"Credentials file not found: {credentials_path}") # Copy credentials to config directory with open(credentials_path, "r") as src: credentials_data = json.load(src) with open(self.credentials_file, "w") as dst: json.dump(credentials_data, dst, indent=2) self.credentials_file.chmod(0o600) def authenticate_user(self, email: Optional[str] = None) -> str: """Authenticate a user and return their email address.""" if not self.credentials_file.exists(): raise FileNotFoundError( "OAuth2 credentials not found. Please set credentials file first." ) # Create OAuth2 flow flow = InstalledAppFlow.from_client_secrets_file( str(self.credentials_file), SCOPES ) # For WSL/headless environments, use console-based auth try: creds = flow.run_local_server(port=0, open_browser=False) except Exception: # Fallback to console-based flow creds = flow.run_console() # Get user info to determine email from googleapiclient.discovery import build service = build("gmail", "v1", credentials=creds) profile = service.users().getProfile(userId="me").execute() user_email = profile["emailAddress"] # Store encrypted tokens token_data = { "token": creds.token, "refresh_token": creds.refresh_token, "token_uri": creds.token_uri, "client_id": creds.client_id, "client_secret": creds.client_secret, "scopes": creds.scopes, "email": user_email, } token_file = self.tokens_dir / f"{user_email}.json" encrypted_data = self._encrypt_data(token_data) token_file.write_bytes(encrypted_data) token_file.chmod(0o600) # Set as current user self.set_current_user(user_email) return user_email def get_credentials(self, email: Optional[str] = None) -> Optional[Credentials]: """Get credentials for a user (current user if email not specified).""" if email is None: email = self.get_current_user() if not email: return None token_file = self.tokens_dir / f"{email}.json" if not token_file.exists(): return None try: encrypted_data = token_file.read_bytes() token_data = self._decrypt_data(encrypted_data) creds = Credentials( token=token_data["token"], refresh_token=token_data["refresh_token"], token_uri=token_data["token_uri"], client_id=token_data["client_id"], client_secret=token_data["client_secret"], scopes=token_data["scopes"], ) # Refresh token if needed if not creds.valid: if creds.expired and creds.refresh_token: creds.refresh(Request()) # Update stored tokens token_data.update( { "token": creds.token, "refresh_token": creds.refresh_token, } ) encrypted_data = self._encrypt_data(token_data) token_file.write_bytes(encrypted_data) else: return None return creds except Exception: return None def get_current_user(self) -> Optional[str]: """Get the currently active user email.""" if not self.current_user_file.exists(): return None try: with open(self.current_user_file, "r") as f: data = json.load(f) return data.get("email") except Exception: return None def set_current_user(self, email: str) -> None: """Set the currently active user.""" with open(self.current_user_file, "w") as f: json.dump({"email": email}, f, indent=2) self.current_user_file.chmod(0o600) def list_users(self) -> List[str]: """List all authenticated users.""" users = [] for token_file in self.tokens_dir.glob("*.json"): email = token_file.stem users.append(email) return sorted(users) def remove_user(self, email: str) -> bool: """Remove a user's authentication.""" token_file = self.tokens_dir / f"{email}.json" if token_file.exists(): token_file.unlink() # If this was the current user, clear current user if self.get_current_user() == email: if self.current_user_file.exists(): self.current_user_file.unlink() return True return False def logout_current_user(self) -> bool: """Logout the current user.""" current_user = self.get_current_user() if current_user: return self.remove_user(current_user) return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/parthashirolkar/gmail-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server