Skip to main content
Glama
google_auth.py5.3 kB
""" Google OAuth2 Authentication Module Handles authentication for all Google Workspace APIs """ import os import json from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build # Full Google Workspace scopes SCOPES = [ # Gmail 'https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.send', 'https://www.googleapis.com/auth/gmail.modify', # Calendar 'https://www.googleapis.com/auth/calendar', 'https://www.googleapis.com/auth/calendar.events', # Sheets 'https://www.googleapis.com/auth/spreadsheets', # Docs 'https://www.googleapis.com/auth/documents', # Drive 'https://www.googleapis.com/auth/drive', 'https://www.googleapis.com/auth/drive.file', ] TOKEN_PATH = os.path.join(os.path.dirname(__file__), '..', 'tokens.json') CREDENTIALS_PATH = os.path.join(os.path.dirname(__file__), '..', 'credentials.json') def get_credentials(): """Get valid credentials, refreshing or re-authenticating as needed""" creds = None if os.path.exists(TOKEN_PATH): creds = Credentials.from_authorized_user_file(TOKEN_PATH, SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: if not os.path.exists(CREDENTIALS_PATH): raise FileNotFoundError( f'credentials.json not found at {CREDENTIALS_PATH}. ' 'Please create it with your OAuth2 credentials from Google Cloud Console.' ) flow = InstalledAppFlow.from_client_secrets_file( CREDENTIALS_PATH, SCOPES) creds = flow.run_local_server(port=0) save_token(creds) return creds def save_token(creds): """Save credentials to token file""" with open(TOKEN_PATH, 'w') as token: token.write(creds.to_json()) def get_auth_url(): """Get authorization URL for OAuth2 flow""" if not os.path.exists(CREDENTIALS_PATH): raise FileNotFoundError('credentials.json not found') flow = InstalledAppFlow.from_client_secrets_file( CREDENTIALS_PATH, SCOPES) authorization_url, _ = flow.authorization_url( access_type='offline', include_granted_scopes='true', prompt='consent' # Force consent to get new scopes ) return authorization_url def get_token_from_code(code): """Exchange authorization code for credentials""" flow = InstalledAppFlow.from_client_secrets_file( CREDENTIALS_PATH, SCOPES) flow.fetch_token(code=code) creds = flow.credentials save_token(creds) return creds def is_authorized(): """Check if user is authorized with valid credentials""" try: creds = get_credentials() return creds and creds.valid except Exception: return False def revoke_credentials(): """Revoke current credentials and delete token file""" try: if os.path.exists(TOKEN_PATH): creds = Credentials.from_authorized_user_file(TOKEN_PATH, SCOPES) if creds: import requests requests.post( 'https://oauth2.googleapis.com/revoke', params={'token': creds.token}, headers={'content-type': 'application/x-www-form-urlencoded'} ) os.remove(TOKEN_PATH) return {'success': True, 'message': 'Credentials revoked and token deleted'} except Exception as error: return {'success': False, 'error': str(error)} def get_current_scopes(): """Get the scopes from current token""" try: if os.path.exists(TOKEN_PATH): with open(TOKEN_PATH, 'r') as f: token_data = json.load(f) return { 'success': True, 'scopes': token_data.get('scopes', []), 'required_scopes': SCOPES } return {'success': False, 'error': 'No token file found'} except Exception as error: return {'success': False, 'error': str(error)} def needs_reauth(): """Check if re-authentication is needed for new scopes""" try: if not os.path.exists(TOKEN_PATH): return True with open(TOKEN_PATH, 'r') as f: token_data = json.load(f) current_scopes = set(token_data.get('scopes', [])) required_scopes = set(SCOPES) # Check if all required scopes are present missing = required_scopes - current_scopes return len(missing) > 0 except Exception: return True def force_reauth(): """Force re-authentication to get new scopes""" try: # Delete existing token if os.path.exists(TOKEN_PATH): os.remove(TOKEN_PATH) # Get new credentials creds = get_credentials() return { 'success': True, 'message': 'Re-authenticated successfully with new scopes', 'scopes': SCOPES } except Exception as error: return {'success': False, 'error': str(error)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pbulbule13/google-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server