MCP Google Suite
by adexltd
Verified
"""Google OAuth authentication module."""
import asyncio
import os
from typing import Optional
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from mcp_google_suite.config import Config
SCOPES = [
"https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/documents",
"https://www.googleapis.com/auth/spreadsheets",
]
class GoogleAuth:
"""Handles Google OAuth2 authentication."""
def __init__(self, config: Optional[Config] = None, config_path: Optional[str] = None):
"""Initialize authentication with optional config or config_path."""
self.config = config or Config.load(config_path)
self.creds: Optional[Credentials] = None
self._creds_lock = asyncio.Lock()
# Ensure credentials directory exists
self.config.ensure_credentials_dir()
async def authenticate(self) -> None:
"""Run the authentication flow and save credentials."""
oauth_creds_path = self.config.credentials.expanded_oauth_credentials
server_creds_path = self.config.credentials.expanded_server_credentials
if not os.path.exists(oauth_creds_path):
raise FileNotFoundError(
f"OAuth keys file not found at {oauth_creds_path}. "
"Please follow these steps:\n"
"1. Create a new Google Cloud project\n"
"2. Enable the Google Drive, Docs, and Sheets APIs\n"
"3. Configure OAuth consent screen\n"
"4. Create OAuth Client ID for Desktop App\n"
"5. Download the JSON file and save as 'oauth.keys.json'\n"
" in the ~/.google directory"
)
print(f"Authenticating and saving credentials to {server_creds_path}")
# Run the flow in a thread since it's blocking
flow = await asyncio.to_thread(
InstalledAppFlow.from_client_secrets_file, oauth_creds_path, SCOPES
)
self.creds = await asyncio.to_thread(flow.run_local_server, port=0)
# Save the credentials
await asyncio.to_thread(self._save_credentials)
print("\nAuthentication successful!")
print(f"Credentials saved to: {server_creds_path}")
def _save_credentials(self) -> None:
"""Save credentials to file (helper method for async operations)."""
if self.creds:
with open(self.config.credentials.expanded_server_credentials, "w") as f:
f.write(self.creds.to_json())
async def get_credentials(self) -> Credentials:
"""Get and refresh Google OAuth2 credentials asynchronously."""
async with self._creds_lock:
if self.creds and self.creds.valid:
return self.creds
if self.creds and self.creds.expired and self.creds.refresh_token:
await asyncio.to_thread(self.creds.refresh, Request())
await asyncio.to_thread(self._save_credentials)
return self.creds
# Try to load saved credentials
server_creds_path = self.config.credentials.expanded_server_credentials
if os.path.exists(server_creds_path):
self.creds = await asyncio.to_thread(
Credentials.from_authorized_user_file, server_creds_path, SCOPES
)
if self.creds.valid:
return self.creds
if self.creds.expired and self.creds.refresh_token:
await asyncio.to_thread(self.creds.refresh, Request())
await asyncio.to_thread(self._save_credentials)
return self.creds
raise FileNotFoundError(
"No valid credentials found. "
"Please run authentication first: python -m mcp_google_suite auth"
)
async def is_authorized(self) -> bool:
"""Check if we have valid credentials asynchronously."""
try:
await self.get_credentials()
return True
except FileNotFoundError:
return False
@property
def authorized(self) -> bool:
"""Synchronous check for valid credentials (use is_authorized for async code)."""
try:
loop = asyncio.get_event_loop()
return loop.run_until_complete(self.is_authorized())
except FileNotFoundError:
return False