"""
OAuth 2.1 Authentication for GitHub.
This module implements the OAuth 2.1 flow for GitHub authentication,
meeting Milestone 1 requirements for secure authentication.
FLOW:
1. User visits /auth/login
2. Redirects to GitHub authorization page
3. User approves access
4. GitHub redirects to /auth/callback with code
5. Exchange code for access token
6. Store token for future use
"""
import httpx
import structlog
import json
from pathlib import Path
from typing import Optional, Dict, Any
from datetime import datetime
from config import settings
logger = structlog.get_logger()
# ============================================================================
# SECTION 1: TOKEN STORAGE
# ============================================================================
class TokenStorage:
"""
Simple file-based token storage.
Stores OAuth tokens persistently so users don't need to re-authenticate
every time the server restarts.
"""
def __init__(self, storage_path: str = "data/tokens"):
"""
Initialize token storage.
Args:
storage_path: Directory to store token files
"""
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
self.token_file = self.storage_path / "github_oauth_token.json"
logger.info("Token storage initialized", path=str(self.token_file))
def save_token(self, token_data: Dict[str, Any]) -> None:
"""
Save OAuth token to file.
Args:
token_data: Token information from GitHub
"""
# Add timestamp
token_data["stored_at"] = datetime.utcnow().isoformat()
# Write to file
with open(self.token_file, 'w') as f:
json.dump(token_data, f, indent=2)
logger.info("OAuth token saved", file=str(self.token_file))
def load_token(self) -> Optional[Dict[str, Any]]:
"""
Load OAuth token from file.
Returns:
Token data if exists, None otherwise
"""
if not self.token_file.exists():
logger.info("No stored token found")
return None
try:
with open(self.token_file, 'r') as f:
token_data = json.load(f)
logger.info("OAuth token loaded", stored_at=token_data.get("stored_at"))
return token_data
except Exception as e:
logger.error("Failed to load token", error=str(e))
return None
def delete_token(self) -> None:
"""Delete stored token (logout)."""
if self.token_file.exists():
self.token_file.unlink()
logger.info("OAuth token deleted")
def has_token(self) -> bool:
"""Check if we have a stored token."""
return self.token_file.exists()
# ============================================================================
# SECTION 2: OAUTH FLOW MANAGER
# ============================================================================
class GitHubOAuthManager:
"""
Manages GitHub OAuth 2.1 authentication flow.
Handles:
- Authorization URL generation
- Token exchange
- Token storage and retrieval
"""
# GitHub OAuth URLs
AUTHORIZE_URL = "https://github.com/login/oauth/authorize"
TOKEN_URL = "https://github.com/login/oauth/access_token"
def __init__(self):
"""Initialize OAuth manager."""
self.client_id = settings.github_client_id
self.client_secret = settings.github_client_secret
self.redirect_uri = settings.github_redirect_uri
self.storage = TokenStorage()
logger.info("GitHub OAuth manager initialized",
client_id=self.client_id[:10] + "...",
redirect_uri=self.redirect_uri)
def get_authorization_url(self, state: Optional[str] = None) -> str:
"""
Generate GitHub authorization URL.
This is where we redirect the user to log in to GitHub.
Args:
state: Optional state parameter for CSRF protection
Returns:
Authorization URL to redirect user to
"""
# Build authorization URL
params = {
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"scope": "repo user", # Request repo and user access
}
if state:
params["state"] = state
# Construct URL
param_str = "&".join([f"{k}={v}" for k, v in params.items()])
auth_url = f"{self.AUTHORIZE_URL}?{param_str}"
logger.info("Generated authorization URL", scope=params["scope"])
return auth_url
async def exchange_code_for_token(self, code: str) -> Dict[str, Any]:
"""
Exchange authorization code for access token.
After user approves on GitHub, we get a code. Exchange it for
an access token that we can use to call GitHub API.
Args:
code: Authorization code from GitHub callback
Returns:
Token data from GitHub
Raises:
Exception: If token exchange fails
"""
logger.info("Exchanging authorization code for token")
# Prepare request
data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"code": code,
"redirect_uri": self.redirect_uri,
}
headers = {
"Accept": "application/json"
}
# Exchange code for token
async with httpx.AsyncClient() as client:
response = await client.post(
self.TOKEN_URL,
data=data,
headers=headers
)
if response.status_code != 200:
logger.error("Token exchange failed",
status=response.status_code,
response=response.text)
raise Exception(f"Failed to exchange code for token: {response.text}")
token_data = response.json()
# Check for errors
if "error" in token_data:
logger.error("OAuth error", error=token_data["error"])
raise Exception(f"OAuth error: {token_data['error']}")
logger.info("Successfully exchanged code for token",
token_type=token_data.get("token_type"),
scope=token_data.get("scope"))
# Save token
self.storage.save_token(token_data)
return token_data
def get_stored_token(self) -> Optional[str]:
"""
Get stored access token.
Returns:
Access token if available, None otherwise
"""
token_data = self.storage.load_token()
if token_data:
return token_data.get("access_token")
return None
def has_valid_token(self) -> bool:
"""
Check if we have a valid stored token.
Returns:
True if token exists (we don't check expiration since GitHub tokens don't expire)
"""
return self.storage.has_token()
def logout(self) -> None:
"""Logout by deleting stored token."""
self.storage.delete_token()
logger.info("User logged out")
# ============================================================================
# SECTION 3: CONVENIENCE FUNCTIONS
# ============================================================================
# Global OAuth manager instance
_oauth_manager = None
def get_oauth_manager() -> GitHubOAuthManager:
"""
Get or create the global OAuth manager.
Returns:
GitHubOAuthManager instance
"""
global _oauth_manager
if _oauth_manager is None:
_oauth_manager = GitHubOAuthManager()
return _oauth_manager