"""
Google OAuth2 Authentication Module
Handles authentication for all Google Workspace APIs
"""
import os
import json
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
# Full Google Workspace scopes
SCOPES = [
# Gmail
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/gmail.send",
"https://www.googleapis.com/auth/gmail.modify",
# Calendar
"https://www.googleapis.com/auth/calendar",
"https://www.googleapis.com/auth/calendar.events",
# Sheets
"https://www.googleapis.com/auth/spreadsheets",
# Docs
"https://www.googleapis.com/auth/documents",
# Drive
"https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/drive.file",
]
TOKEN_PATH = os.path.join(os.path.dirname(__file__), "..", "tokens.json")
CREDENTIALS_PATH = os.path.join(os.path.dirname(__file__), "..", "credentials.json")
def get_credentials():
"""Get valid credentials, refreshing or re-authenticating as needed"""
creds = None
if os.path.exists(TOKEN_PATH):
creds = Credentials.from_authorized_user_file(TOKEN_PATH, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
if not os.path.exists(CREDENTIALS_PATH):
raise FileNotFoundError(
f"credentials.json not found at {CREDENTIALS_PATH}. "
"Please create it with your OAuth2 credentials from Google Cloud Console."
)
flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_PATH, SCOPES)
creds = flow.run_local_server(port=0)
save_token(creds)
return creds
def save_token(creds):
"""Save credentials to token file"""
with open(TOKEN_PATH, "w") as token:
token.write(creds.to_json())
def get_auth_url():
"""Get authorization URL for OAuth2 flow"""
if not os.path.exists(CREDENTIALS_PATH):
raise FileNotFoundError("credentials.json not found")
flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_PATH, SCOPES)
authorization_url, _ = flow.authorization_url(
access_type="offline",
include_granted_scopes="true",
prompt="consent", # Force consent to get new scopes
)
return authorization_url
def get_token_from_code(code):
"""Exchange authorization code for credentials"""
flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_PATH, SCOPES)
flow.fetch_token(code=code)
creds = flow.credentials
save_token(creds)
return creds
def is_authorized():
"""Check if user is authorized with valid credentials"""
try:
creds = get_credentials()
return creds and creds.valid
except Exception:
return False
def revoke_credentials():
"""Revoke current credentials and delete token file"""
try:
if os.path.exists(TOKEN_PATH):
creds = Credentials.from_authorized_user_file(TOKEN_PATH, SCOPES)
if creds:
import requests
requests.post(
"https://oauth2.googleapis.com/revoke",
params={"token": creds.token},
headers={"content-type": "application/x-www-form-urlencoded"},
)
os.remove(TOKEN_PATH)
return {"success": True, "message": "Credentials revoked and token deleted"}
except Exception as error:
return {"success": False, "error": str(error)}
def get_current_scopes():
"""Get the scopes from current token"""
try:
if os.path.exists(TOKEN_PATH):
with open(TOKEN_PATH, "r") as f:
token_data = json.load(f)
return {
"success": True,
"scopes": token_data.get("scopes", []),
"required_scopes": SCOPES,
}
return {"success": False, "error": "No token file found"}
except Exception as error:
return {"success": False, "error": str(error)}
def needs_reauth():
"""Check if re-authentication is needed for new scopes"""
try:
if not os.path.exists(TOKEN_PATH):
return True
with open(TOKEN_PATH, "r") as f:
token_data = json.load(f)
current_scopes = set(token_data.get("scopes", []))
required_scopes = set(SCOPES)
# Check if all required scopes are present
missing = required_scopes - current_scopes
return len(missing) > 0
except Exception:
return True
def force_reauth():
"""Force re-authentication to get new scopes"""
try:
# Delete existing token
if os.path.exists(TOKEN_PATH):
os.remove(TOKEN_PATH)
# Get new credentials
creds = get_credentials()
return {
"success": True,
"message": "Re-authenticated successfully with new scopes",
"scopes": SCOPES,
}
except Exception as error:
return {"success": False, "error": str(error)}