Skip to main content
Glama

Google Workspace MCP Server - Control Gmail, Calendar, Docs, Sheets, Slides, Chat, Forms & Drive

google_auth.py37.4 kB
# auth/google_auth.py import asyncio import json import jwt import logging import os from typing import List, Optional, Tuple, Dict, Any from urllib.parse import parse_qs, urlparse from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import Flow from google.auth.transport.requests import Request from google.auth.exceptions import RefreshError from googleapiclient.discovery import build from googleapiclient.errors import HttpError from auth.scopes import SCOPES, get_current_scopes # noqa from auth.oauth21_session_store import get_oauth21_session_store from auth.credential_store import get_credential_store from auth.oauth_config import get_oauth_config, is_stateless_mode from core.config import ( get_transport_mode, get_oauth_redirect_uri, ) from core.context import get_fastmcp_session_id # Try to import FastMCP dependencies (may not be available in all environments) try: from fastmcp.server.dependencies import get_context as get_fastmcp_context except ImportError: get_fastmcp_context = None # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Constants def get_default_credentials_dir(): """Get the default credentials directory path, preferring user-specific locations.""" # Check for explicit environment variable override if os.getenv("GOOGLE_MCP_CREDENTIALS_DIR"): return os.getenv("GOOGLE_MCP_CREDENTIALS_DIR") # Use user home directory for credentials storage home_dir = os.path.expanduser("~") if home_dir and home_dir != "~": # Valid home directory found return os.path.join(home_dir, ".google_workspace_mcp", "credentials") # Fallback to current working directory if home directory is not accessible return os.path.join(os.getcwd(), ".credentials") DEFAULT_CREDENTIALS_DIR = get_default_credentials_dir() # Session credentials now handled by OAuth21SessionStore - no local cache needed # Centralized Client Secrets Path Logic _client_secrets_env = os.getenv("GOOGLE_CLIENT_SECRET_PATH") or os.getenv( "GOOGLE_CLIENT_SECRETS" ) if _client_secrets_env: CONFIG_CLIENT_SECRETS_PATH = _client_secrets_env else: # Assumes this file is in auth/ and client_secret.json is in the root CONFIG_CLIENT_SECRETS_PATH = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "client_secret.json", ) # --- Helper Functions --- def _find_any_credentials( base_dir: str = DEFAULT_CREDENTIALS_DIR, ) -> Optional[Credentials]: """ Find and load any valid credentials from the credentials directory. Used in single-user mode to bypass session-to-OAuth mapping. Returns: First valid Credentials object found, or None if none exist. """ try: store = get_credential_store() users = store.list_users() if not users: logger.info( "[single-user] No users found with credentials via credential store" ) return None # Return credentials for the first user found first_user = users[0] credentials = store.get_credential(first_user) if credentials: logger.info( f"[single-user] Found credentials for {first_user} via credential store" ) return credentials else: logger.warning( f"[single-user] Could not load credentials for {first_user} via credential store" ) except Exception as e: logger.error( f"[single-user] Error finding credentials via credential store: {e}" ) logger.info("[single-user] No valid credentials found via credential store") return None def save_credentials_to_session(session_id: str, credentials: Credentials): """Saves user credentials using OAuth21SessionStore.""" # Get user email from credentials if possible user_email = None if credentials and credentials.id_token: try: decoded_token = jwt.decode( credentials.id_token, options={"verify_signature": False} ) user_email = decoded_token.get("email") except Exception as e: logger.debug(f"Could not decode id_token to get email: {e}") if user_email: store = get_oauth21_session_store() store.store_session( user_email=user_email, access_token=credentials.token, refresh_token=credentials.refresh_token, token_uri=credentials.token_uri, client_id=credentials.client_id, client_secret=credentials.client_secret, scopes=credentials.scopes, expiry=credentials.expiry, mcp_session_id=session_id ) logger.debug(f"Credentials saved to OAuth21SessionStore for session_id: {session_id}, user: {user_email}") else: logger.warning(f"Could not save credentials to session store - no user email found for session: {session_id}") def load_credentials_from_session(session_id: str) -> Optional[Credentials]: """Loads user credentials from OAuth21SessionStore.""" store = get_oauth21_session_store() credentials = store.get_credentials_by_mcp_session(session_id) if credentials: logger.debug( f"Credentials loaded from OAuth21SessionStore for session_id: {session_id}" ) else: logger.debug( f"No credentials found in OAuth21SessionStore for session_id: {session_id}" ) return credentials def load_client_secrets_from_env() -> Optional[Dict[str, Any]]: """ Loads the client secrets from environment variables. Environment variables used: - GOOGLE_OAUTH_CLIENT_ID: OAuth 2.0 client ID - GOOGLE_OAUTH_CLIENT_SECRET: OAuth 2.0 client secret - GOOGLE_OAUTH_REDIRECT_URI: (optional) OAuth redirect URI Returns: Client secrets configuration dict compatible with Google OAuth library, or None if required environment variables are not set. """ client_id = os.getenv("GOOGLE_OAUTH_CLIENT_ID") client_secret = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET") redirect_uri = os.getenv("GOOGLE_OAUTH_REDIRECT_URI") if client_id and client_secret: # Create config structure that matches Google client secrets format web_config = { "client_id": client_id, "client_secret": client_secret, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", } # Add redirect_uri if provided via environment variable if redirect_uri: web_config["redirect_uris"] = [redirect_uri] # Return the full config structure expected by Google OAuth library config = {"web": web_config} logger.info("Loaded OAuth client credentials from environment variables") return config logger.debug("OAuth client credentials not found in environment variables") return None def load_client_secrets(client_secrets_path: str) -> Dict[str, Any]: """ Loads the client secrets from environment variables (preferred) or from the client secrets file. Priority order: 1. Environment variables (GOOGLE_OAUTH_CLIENT_ID, GOOGLE_OAUTH_CLIENT_SECRET) 2. File-based credentials at the specified path Args: client_secrets_path: Path to the client secrets JSON file (used as fallback) Returns: Client secrets configuration dict Raises: ValueError: If client secrets file has invalid format IOError: If file cannot be read and no environment variables are set """ # First, try to load from environment variables env_config = load_client_secrets_from_env() if env_config: # Extract the "web" config from the environment structure return env_config["web"] # Fall back to loading from file try: with open(client_secrets_path, "r") as f: client_config = json.load(f) # The file usually contains a top-level key like "web" or "installed" if "web" in client_config: logger.info( f"Loaded OAuth client credentials from file: {client_secrets_path}" ) return client_config["web"] elif "installed" in client_config: logger.info( f"Loaded OAuth client credentials from file: {client_secrets_path}" ) return client_config["installed"] else: logger.error( f"Client secrets file {client_secrets_path} has unexpected format." ) raise ValueError("Invalid client secrets file format") except (IOError, json.JSONDecodeError) as e: logger.error(f"Error loading client secrets file {client_secrets_path}: {e}") raise def check_client_secrets() -> Optional[str]: """ Checks for the presence of OAuth client secrets, either as environment variables or as a file. Returns: An error message string if secrets are not found, otherwise None. """ env_config = load_client_secrets_from_env() if not env_config and not os.path.exists(CONFIG_CLIENT_SECRETS_PATH): logger.error( f"OAuth client credentials not found. No environment variables set and no file at {CONFIG_CLIENT_SECRETS_PATH}" ) return f"OAuth client credentials not found. Please set GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET environment variables or provide a client secrets file at {CONFIG_CLIENT_SECRETS_PATH}." return None def create_oauth_flow( scopes: List[str], redirect_uri: str, state: Optional[str] = None ) -> Flow: """Creates an OAuth flow using environment variables or client secrets file.""" # Try environment variables first env_config = load_client_secrets_from_env() if env_config: # Use client config directly flow = Flow.from_client_config( env_config, scopes=scopes, redirect_uri=redirect_uri, state=state ) logger.debug("Created OAuth flow from environment variables") return flow # Fall back to file-based config if not os.path.exists(CONFIG_CLIENT_SECRETS_PATH): raise FileNotFoundError( f"OAuth client secrets file not found at {CONFIG_CLIENT_SECRETS_PATH} and no environment variables set" ) flow = Flow.from_client_secrets_file( CONFIG_CLIENT_SECRETS_PATH, scopes=scopes, redirect_uri=redirect_uri, state=state, ) logger.debug( f"Created OAuth flow from client secrets file: {CONFIG_CLIENT_SECRETS_PATH}" ) return flow # --- Core OAuth Logic --- async def start_auth_flow( user_google_email: Optional[str], service_name: str, # e.g., "Google Calendar", "Gmail" for user messages redirect_uri: str, # Added redirect_uri as a required parameter ) -> str: """ Initiates the Google OAuth flow and returns an actionable message for the user. Args: user_google_email: The user's specified Google email, if provided. service_name: The name of the Google service requiring auth (for user messages). redirect_uri: The URI Google will redirect to after authorization. Returns: A formatted string containing guidance for the LLM/user. Raises: Exception: If the OAuth flow cannot be initiated. """ initial_email_provided = bool( user_google_email and user_google_email.strip() and user_google_email.lower() != "default" ) user_display_name = ( f"{service_name} for '{user_google_email}'" if initial_email_provided else service_name ) logger.info( f"[start_auth_flow] Initiating auth for {user_display_name} with scopes for enabled tools." ) # Note: Caller should ensure OAuth callback is available before calling this function try: if "OAUTHLIB_INSECURE_TRANSPORT" not in os.environ and ( "localhost" in redirect_uri or "127.0.0.1" in redirect_uri ): # Use passed redirect_uri logger.warning( "OAUTHLIB_INSECURE_TRANSPORT not set. Setting it for localhost/local development." ) os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" oauth_state = os.urandom(16).hex() flow = create_oauth_flow( scopes=get_current_scopes(), # Use scopes for enabled tools only redirect_uri=redirect_uri, # Use passed redirect_uri state=oauth_state, ) auth_url, _ = flow.authorization_url(access_type="offline", prompt="consent") session_id = None try: session_id = get_fastmcp_session_id() except Exception as e: logger.debug(f"Could not retrieve FastMCP session ID for state binding: {e}") store = get_oauth21_session_store() store.store_oauth_state(oauth_state, session_id=session_id) logger.info( f"Auth flow started for {user_display_name}. State: {oauth_state[:8]}... Advise user to visit: {auth_url}" ) message_lines = [ f"**ACTION REQUIRED: Google Authentication Needed for {user_display_name}**\n", f"To proceed, the user must authorize this application for {service_name} access using all required permissions.", "**LLM, please present this exact authorization URL to the user as a clickable hyperlink:**", f"Authorization URL: {auth_url}", f"Markdown for hyperlink: [Click here to authorize {service_name} access]({auth_url})\n", "**LLM, after presenting the link, instruct the user as follows:**", "1. Click the link and complete the authorization in their browser.", ] session_info_for_llm = "" if not initial_email_provided: message_lines.extend( [ f"2. After successful authorization{session_info_for_llm}, the browser page will display the authenticated email address.", " **LLM: Instruct the user to provide you with this email address.**", "3. Once you have the email, **retry their original command, ensuring you include this `user_google_email`.**", ] ) else: message_lines.append( f"2. After successful authorization{session_info_for_llm}, **retry their original command**." ) message_lines.append( f"\nThe application will use the new credentials. If '{user_google_email}' was provided, it must match the authenticated account." ) return "\n".join(message_lines) except FileNotFoundError as e: error_text = f"OAuth client credentials not found: {e}. Please either:\n1. Set environment variables: GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET\n2. Ensure '{CONFIG_CLIENT_SECRETS_PATH}' file exists" logger.error(error_text, exc_info=True) raise Exception(error_text) except Exception as e: error_text = f"Could not initiate authentication for {user_display_name} due to an unexpected error: {str(e)}" logger.error( f"Failed to start the OAuth flow for {user_display_name}: {e}", exc_info=True, ) raise Exception(error_text) def handle_auth_callback( scopes: List[str], authorization_response: str, redirect_uri: str, credentials_base_dir: str = DEFAULT_CREDENTIALS_DIR, session_id: Optional[str] = None, client_secrets_path: Optional[ str ] = None, # Deprecated: kept for backward compatibility ) -> Tuple[str, Credentials]: """ Handles the callback from Google, exchanges the code for credentials, fetches user info, determines user_google_email, saves credentials (file & session), and returns them. Args: scopes: List of OAuth scopes requested. authorization_response: The full callback URL from Google. redirect_uri: The redirect URI. credentials_base_dir: Base directory for credential files. session_id: Optional MCP session ID to associate with the credentials. client_secrets_path: (Deprecated) Path to client secrets file. Ignored if environment variables are set. Returns: A tuple containing the user_google_email and the obtained Credentials object. Raises: ValueError: If the state is missing or doesn't match. FlowExchangeError: If the code exchange fails. HttpError: If fetching user info fails. """ try: # Log deprecation warning if old parameter is used if client_secrets_path: logger.warning( "The 'client_secrets_path' parameter is deprecated. Use GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET environment variables instead." ) # Allow HTTP for localhost in development if "OAUTHLIB_INSECURE_TRANSPORT" not in os.environ: logger.warning( "OAUTHLIB_INSECURE_TRANSPORT not set. Setting it for localhost development." ) os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" store = get_oauth21_session_store() parsed_response = urlparse(authorization_response) state_values = parse_qs(parsed_response.query).get("state") state = state_values[0] if state_values else None state_info = store.validate_and_consume_oauth_state(state, session_id=session_id) logger.debug( "Validated OAuth callback state %s for session %s", (state[:8] if state else "<missing>"), state_info.get("session_id") or "<unknown>", ) flow = create_oauth_flow( scopes=scopes, redirect_uri=redirect_uri, state=state ) # Exchange the authorization code for credentials # Note: fetch_token will use the redirect_uri configured in the flow flow.fetch_token(authorization_response=authorization_response) credentials = flow.credentials logger.info("Successfully exchanged authorization code for tokens.") # Get user info to determine user_id (using email here) user_info = get_user_info(credentials) if not user_info or "email" not in user_info: logger.error("Could not retrieve user email from Google.") raise ValueError("Failed to get user email for identification.") user_google_email = user_info["email"] logger.info(f"Identified user_google_email: {user_google_email}") # Save the credentials credential_store = get_credential_store() credential_store.store_credential(user_google_email, credentials) # Always save to OAuth21SessionStore for centralized management store = get_oauth21_session_store() store.store_session( user_email=user_google_email, access_token=credentials.token, refresh_token=credentials.refresh_token, token_uri=credentials.token_uri, client_id=credentials.client_id, client_secret=credentials.client_secret, scopes=credentials.scopes, expiry=credentials.expiry, mcp_session_id=session_id, issuer="https://accounts.google.com" # Add issuer for Google tokens ) # If session_id is provided, also save to session cache for compatibility if session_id: save_credentials_to_session(session_id, credentials) return user_google_email, credentials except Exception as e: # Catch specific exceptions like FlowExchangeError if needed logger.error(f"Error handling auth callback: {e}") raise # Re-raise for the caller def get_credentials( user_google_email: Optional[str], # Can be None if relying on session_id required_scopes: List[str], client_secrets_path: Optional[str] = None, credentials_base_dir: str = DEFAULT_CREDENTIALS_DIR, session_id: Optional[str] = None, ) -> Optional[Credentials]: """ Retrieves stored credentials, prioritizing OAuth 2.1 store, then session, then file. Refreshes if necessary. If credentials are loaded from file and a session_id is present, they are cached in the session. In single-user mode, bypasses session mapping and uses any available credentials. Args: user_google_email: Optional user's Google email. required_scopes: List of scopes the credentials must have. client_secrets_path: Path to client secrets, required for refresh if not in creds. credentials_base_dir: Base directory for credential files. session_id: Optional MCP session ID. Returns: Valid Credentials object or None. """ # First, try OAuth 2.1 session store if we have a session_id (FastMCP session) if session_id: try: store = get_oauth21_session_store() # Try to get credentials by MCP session credentials = store.get_credentials_by_mcp_session(session_id) if credentials: logger.info(f"[get_credentials] Found OAuth 2.1 credentials for MCP session {session_id}") # Check scopes if not all(scope in credentials.scopes for scope in required_scopes): logger.warning( f"[get_credentials] OAuth 2.1 credentials lack required scopes. Need: {required_scopes}, Have: {credentials.scopes}" ) return None # Return if valid if credentials.valid: return credentials elif credentials.expired and credentials.refresh_token: # Try to refresh try: credentials.refresh(Request()) logger.info(f"[get_credentials] Refreshed OAuth 2.1 credentials for session {session_id}") # Update stored credentials user_email = store.get_user_by_mcp_session(session_id) if user_email: store.store_session( user_email=user_email, access_token=credentials.token, refresh_token=credentials.refresh_token, scopes=credentials.scopes, expiry=credentials.expiry, mcp_session_id=session_id ) return credentials except Exception as e: logger.error(f"[get_credentials] Failed to refresh OAuth 2.1 credentials: {e}") return None except ImportError: pass # OAuth 2.1 store not available except Exception as e: logger.debug(f"[get_credentials] Error checking OAuth 2.1 store: {e}") # Check for single-user mode if os.getenv("MCP_SINGLE_USER_MODE") == "1": logger.info( "[get_credentials] Single-user mode: bypassing session mapping, finding any credentials" ) credentials = _find_any_credentials(credentials_base_dir) if not credentials: logger.info( f"[get_credentials] Single-user mode: No credentials found in {credentials_base_dir}" ) return None # In single-user mode, if user_google_email wasn't provided, try to get it from user info # This is needed for proper credential saving after refresh if not user_google_email and credentials.valid: try: user_info = get_user_info(credentials) if user_info and "email" in user_info: user_google_email = user_info["email"] logger.debug( f"[get_credentials] Single-user mode: extracted user email {user_google_email} from credentials" ) except Exception as e: logger.debug( f"[get_credentials] Single-user mode: could not extract user email: {e}" ) else: credentials: Optional[Credentials] = None # Session ID should be provided by the caller if not session_id: logger.debug("[get_credentials] No session_id provided") logger.debug( f"[get_credentials] Called for user_google_email: '{user_google_email}', session_id: '{session_id}', required_scopes: {required_scopes}" ) if session_id: credentials = load_credentials_from_session(session_id) if credentials: logger.debug( f"[get_credentials] Loaded credentials from session for session_id '{session_id}'." ) if not credentials and user_google_email: if not is_stateless_mode(): logger.debug( f"[get_credentials] No session credentials, trying credential store for user_google_email '{user_google_email}'." ) store = get_credential_store() credentials = store.get_credential(user_google_email) else: logger.debug( f"[get_credentials] No session credentials, skipping file store in stateless mode for user_google_email '{user_google_email}'." ) if credentials and session_id: logger.debug( f"[get_credentials] Loaded from file for user '{user_google_email}', caching to session '{session_id}'." ) save_credentials_to_session( session_id, credentials ) # Cache for current session if not credentials: logger.info( f"[get_credentials] No credentials found for user '{user_google_email}' or session '{session_id}'." ) return None logger.debug( f"[get_credentials] Credentials found. Scopes: {credentials.scopes}, Valid: {credentials.valid}, Expired: {credentials.expired}" ) if not all(scope in credentials.scopes for scope in required_scopes): logger.warning( f"[get_credentials] Credentials lack required scopes. Need: {required_scopes}, Have: {credentials.scopes}. User: '{user_google_email}', Session: '{session_id}'" ) return None # Re-authentication needed for scopes logger.debug( f"[get_credentials] Credentials have sufficient scopes. User: '{user_google_email}', Session: '{session_id}'" ) if credentials.valid: logger.debug( f"[get_credentials] Credentials are valid. User: '{user_google_email}', Session: '{session_id}'" ) return credentials elif credentials.expired and credentials.refresh_token: logger.info( f"[get_credentials] Credentials expired. Attempting refresh. User: '{user_google_email}', Session: '{session_id}'" ) if not client_secrets_path: logger.error( "[get_credentials] Client secrets path required for refresh but not provided." ) return None try: logger.debug( f"[get_credentials] Refreshing token using client_secrets_path: {client_secrets_path}" ) # client_config = load_client_secrets(client_secrets_path) # Not strictly needed if creds have client_id/secret credentials.refresh(Request()) logger.info( f"[get_credentials] Credentials refreshed successfully. User: '{user_google_email}', Session: '{session_id}'" ) # Save refreshed credentials (skip file save in stateless mode) if user_google_email: # Always save to credential store if email is known if not is_stateless_mode(): credential_store = get_credential_store() credential_store.store_credential(user_google_email, credentials) else: logger.info(f"Skipping credential file save in stateless mode for {user_google_email}") # Also update OAuth21SessionStore store = get_oauth21_session_store() store.store_session( user_email=user_google_email, access_token=credentials.token, refresh_token=credentials.refresh_token, token_uri=credentials.token_uri, client_id=credentials.client_id, client_secret=credentials.client_secret, scopes=credentials.scopes, expiry=credentials.expiry, mcp_session_id=session_id, issuer="https://accounts.google.com" # Add issuer for Google tokens ) if session_id: # Update session cache if it was the source or is active save_credentials_to_session(session_id, credentials) return credentials except RefreshError as e: logger.warning( f"[get_credentials] RefreshError - token expired/revoked: {e}. User: '{user_google_email}', Session: '{session_id}'" ) # For RefreshError, we should return None to trigger reauthentication return None except Exception as e: logger.error( f"[get_credentials] Error refreshing credentials: {e}. User: '{user_google_email}', Session: '{session_id}'", exc_info=True, ) return None # Failed to refresh else: logger.warning( f"[get_credentials] Credentials invalid/cannot refresh. Valid: {credentials.valid}, Refresh Token: {credentials.refresh_token is not None}. User: '{user_google_email}', Session: '{session_id}'" ) return None def get_user_info(credentials: Credentials) -> Optional[Dict[str, Any]]: """Fetches basic user profile information (requires userinfo.email scope).""" if not credentials or not credentials.valid: logger.error("Cannot get user info: Invalid or missing credentials.") return None try: # Using googleapiclient discovery to get user info # Requires 'google-api-python-client' library service = build("oauth2", "v2", credentials=credentials) user_info = service.userinfo().get().execute() logger.info(f"Successfully fetched user info: {user_info.get('email')}") return user_info except HttpError as e: logger.error(f"HttpError fetching user info: {e.status_code} {e.reason}") # Handle specific errors, e.g., 401 Unauthorized might mean token issue return None except Exception as e: logger.error(f"Unexpected error fetching user info: {e}") return None # --- Centralized Google Service Authentication --- class GoogleAuthenticationError(Exception): """Exception raised when Google authentication is required or fails.""" def __init__(self, message: str, auth_url: Optional[str] = None): super().__init__(message) self.auth_url = auth_url async def get_authenticated_google_service( service_name: str, # "gmail", "calendar", "drive", "docs" version: str, # "v1", "v3" tool_name: str, # For logging/debugging user_google_email: str, # Required - no more Optional required_scopes: List[str], session_id: Optional[str] = None, # Session context for logging ) -> tuple[Any, str]: """ Centralized Google service authentication for all MCP tools. Returns (service, user_email) on success or raises GoogleAuthenticationError. Args: service_name: The Google service name ("gmail", "calendar", "drive", "docs") version: The API version ("v1", "v3", etc.) tool_name: The name of the calling tool (for logging/debugging) user_google_email: The user's Google email address (required) required_scopes: List of required OAuth scopes Returns: tuple[service, user_email] on success Raises: GoogleAuthenticationError: When authentication is required or fails """ # Try to get FastMCP session ID if not provided if not session_id: try: # First try context variable (works in async context) session_id = get_fastmcp_session_id() if session_id: logger.debug(f"[{tool_name}] Got FastMCP session ID from context: {session_id}") else: logger.debug(f"[{tool_name}] Context variable returned None/empty session ID") except Exception as e: logger.debug( f"[{tool_name}] Could not get FastMCP session from context: {e}" ) # Fallback to direct FastMCP context if context variable not set if not session_id and get_fastmcp_context: try: fastmcp_ctx = get_fastmcp_context() if fastmcp_ctx and hasattr(fastmcp_ctx, 'session_id'): session_id = fastmcp_ctx.session_id logger.debug(f"[{tool_name}] Got FastMCP session ID directly: {session_id}") else: logger.debug(f"[{tool_name}] FastMCP context exists but no session_id attribute") except Exception as e: logger.debug(f"[{tool_name}] Could not get FastMCP context directly: {e}") # Final fallback: log if we still don't have session_id if not session_id: logger.warning(f"[{tool_name}] Unable to obtain FastMCP session ID from any source") logger.info( f"[{tool_name}] Attempting to get authenticated {service_name} service. Email: '{user_google_email}', Session: '{session_id}'" ) # Validate email format if not user_google_email or "@" not in user_google_email: error_msg = f"Authentication required for {tool_name}. No valid 'user_google_email' provided. Please provide a valid Google email address." logger.info(f"[{tool_name}] {error_msg}") raise GoogleAuthenticationError(error_msg) credentials = await asyncio.to_thread( get_credentials, user_google_email=user_google_email, required_scopes=required_scopes, client_secrets_path=CONFIG_CLIENT_SECRETS_PATH, session_id=session_id, # Pass through session context ) if not credentials or not credentials.valid: logger.warning(f"[{tool_name}] No valid credentials. Email: '{user_google_email}'.") logger.info(f"[{tool_name}] Valid email '{user_google_email}' provided, initiating auth flow.") # Ensure OAuth callback is available from auth.oauth_callback_server import ensure_oauth_callback_available redirect_uri = get_oauth_redirect_uri() config = get_oauth_config() success, error_msg = ensure_oauth_callback_available( get_transport_mode(), config.port, config.base_uri ) if not success: error_detail = f" ({error_msg})" if error_msg else "" raise GoogleAuthenticationError( f"Cannot initiate OAuth flow - callback server unavailable{error_detail}" ) # Generate auth URL and raise exception with it auth_response = await start_auth_flow( user_google_email=user_google_email, service_name=f"Google {service_name.title()}", redirect_uri=redirect_uri, ) # Extract the auth URL from the response and raise with it raise GoogleAuthenticationError(auth_response) try: service = build(service_name, version, credentials=credentials) log_user_email = user_google_email # Try to get email from credentials if needed for validation if credentials and credentials.id_token: try: # Decode without verification (just to get email for logging) decoded_token = jwt.decode( credentials.id_token, options={"verify_signature": False} ) token_email = decoded_token.get("email") if token_email: log_user_email = token_email logger.info(f"[{tool_name}] Token email: {token_email}") except Exception as e: logger.debug(f"[{tool_name}] Could not decode id_token: {e}") logger.info( f"[{tool_name}] Successfully authenticated {service_name} service for user: {log_user_email}" ) return service, log_user_email except Exception as e: error_msg = f"[{tool_name}] Failed to build {service_name} service: {str(e)}" logger.error(error_msg, exc_info=True) raise GoogleAuthenticationError(error_msg)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/taylorwilsdon/google_workspace_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server