"""
Google OAuth2 Authentication Module
Handles authentication for all Google Workspace APIs
"""
import os
import json
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
# Full Google Workspace scopes
SCOPES = [
# Gmail
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/gmail.modify',
# Calendar
'https://www.googleapis.com/auth/calendar',
'https://www.googleapis.com/auth/calendar.events',
# Sheets
'https://www.googleapis.com/auth/spreadsheets',
# Docs
'https://www.googleapis.com/auth/documents',
# Drive
'https://www.googleapis.com/auth/drive',
'https://www.googleapis.com/auth/drive.file',
]
TOKEN_PATH = os.path.join(os.path.dirname(__file__), '..', 'tokens.json')
CREDENTIALS_PATH = os.path.join(os.path.dirname(__file__), '..', 'credentials.json')
def get_credentials():
"""Get valid credentials, refreshing or re-authenticating as needed"""
creds = None
if os.path.exists(TOKEN_PATH):
creds = Credentials.from_authorized_user_file(TOKEN_PATH, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
if not os.path.exists(CREDENTIALS_PATH):
raise FileNotFoundError(
f'credentials.json not found at {CREDENTIALS_PATH}. '
'Please create it with your OAuth2 credentials from Google Cloud Console.'
)
flow = InstalledAppFlow.from_client_secrets_file(
CREDENTIALS_PATH, SCOPES)
creds = flow.run_local_server(port=0)
save_token(creds)
return creds
def save_token(creds):
"""Save credentials to token file"""
with open(TOKEN_PATH, 'w') as token:
token.write(creds.to_json())
def get_auth_url():
"""Get authorization URL for OAuth2 flow"""
if not os.path.exists(CREDENTIALS_PATH):
raise FileNotFoundError('credentials.json not found')
flow = InstalledAppFlow.from_client_secrets_file(
CREDENTIALS_PATH, SCOPES)
authorization_url, _ = flow.authorization_url(
access_type='offline',
include_granted_scopes='true',
prompt='consent' # Force consent to get new scopes
)
return authorization_url
def get_token_from_code(code):
"""Exchange authorization code for credentials"""
flow = InstalledAppFlow.from_client_secrets_file(
CREDENTIALS_PATH, SCOPES)
flow.fetch_token(code=code)
creds = flow.credentials
save_token(creds)
return creds
def is_authorized():
"""Check if user is authorized with valid credentials"""
try:
creds = get_credentials()
return creds and creds.valid
except Exception:
return False
def revoke_credentials():
"""Revoke current credentials and delete token file"""
try:
if os.path.exists(TOKEN_PATH):
creds = Credentials.from_authorized_user_file(TOKEN_PATH, SCOPES)
if creds:
import requests
requests.post(
'https://oauth2.googleapis.com/revoke',
params={'token': creds.token},
headers={'content-type': 'application/x-www-form-urlencoded'}
)
os.remove(TOKEN_PATH)
return {'success': True, 'message': 'Credentials revoked and token deleted'}
except Exception as error:
return {'success': False, 'error': str(error)}
def get_current_scopes():
"""Get the scopes from current token"""
try:
if os.path.exists(TOKEN_PATH):
with open(TOKEN_PATH, 'r') as f:
token_data = json.load(f)
return {
'success': True,
'scopes': token_data.get('scopes', []),
'required_scopes': SCOPES
}
return {'success': False, 'error': 'No token file found'}
except Exception as error:
return {'success': False, 'error': str(error)}
def needs_reauth():
"""Check if re-authentication is needed for new scopes"""
try:
if not os.path.exists(TOKEN_PATH):
return True
with open(TOKEN_PATH, 'r') as f:
token_data = json.load(f)
current_scopes = set(token_data.get('scopes', []))
required_scopes = set(SCOPES)
# Check if all required scopes are present
missing = required_scopes - current_scopes
return len(missing) > 0
except Exception:
return True
def force_reauth():
"""Force re-authentication to get new scopes"""
try:
# Delete existing token
if os.path.exists(TOKEN_PATH):
os.remove(TOKEN_PATH)
# Get new credentials
creds = get_credentials()
return {
'success': True,
'message': 'Re-authenticated successfully with new scopes',
'scopes': SCOPES
}
except Exception as error:
return {'success': False, 'error': str(error)}