Skip to main content
Glama
kalpalathika

MCP Enhanced Data Retrieval System

by kalpalathika
oauth.py7.78 kB
""" OAuth 2.1 Authentication for GitHub. This module implements the OAuth 2.1 flow for GitHub authentication, meeting Milestone 1 requirements for secure authentication. FLOW: 1. User visits /auth/login 2. Redirects to GitHub authorization page 3. User approves access 4. GitHub redirects to /auth/callback with code 5. Exchange code for access token 6. Store token for future use """ import httpx import structlog import json from pathlib import Path from typing import Optional, Dict, Any from datetime import datetime from config import settings logger = structlog.get_logger() # ============================================================================ # SECTION 1: TOKEN STORAGE # ============================================================================ class TokenStorage: """ Simple file-based token storage. Stores OAuth tokens persistently so users don't need to re-authenticate every time the server restarts. """ def __init__(self, storage_path: str = "data/tokens"): """ Initialize token storage. Args: storage_path: Directory to store token files """ self.storage_path = Path(storage_path) self.storage_path.mkdir(parents=True, exist_ok=True) self.token_file = self.storage_path / "github_oauth_token.json" logger.info("Token storage initialized", path=str(self.token_file)) def save_token(self, token_data: Dict[str, Any]) -> None: """ Save OAuth token to file. Args: token_data: Token information from GitHub """ # Add timestamp token_data["stored_at"] = datetime.utcnow().isoformat() # Write to file with open(self.token_file, 'w') as f: json.dump(token_data, f, indent=2) logger.info("OAuth token saved", file=str(self.token_file)) def load_token(self) -> Optional[Dict[str, Any]]: """ Load OAuth token from file. Returns: Token data if exists, None otherwise """ if not self.token_file.exists(): logger.info("No stored token found") return None try: with open(self.token_file, 'r') as f: token_data = json.load(f) logger.info("OAuth token loaded", stored_at=token_data.get("stored_at")) return token_data except Exception as e: logger.error("Failed to load token", error=str(e)) return None def delete_token(self) -> None: """Delete stored token (logout).""" if self.token_file.exists(): self.token_file.unlink() logger.info("OAuth token deleted") def has_token(self) -> bool: """Check if we have a stored token.""" return self.token_file.exists() # ============================================================================ # SECTION 2: OAUTH FLOW MANAGER # ============================================================================ class GitHubOAuthManager: """ Manages GitHub OAuth 2.1 authentication flow. Handles: - Authorization URL generation - Token exchange - Token storage and retrieval """ # GitHub OAuth URLs AUTHORIZE_URL = "https://github.com/login/oauth/authorize" TOKEN_URL = "https://github.com/login/oauth/access_token" def __init__(self): """Initialize OAuth manager.""" self.client_id = settings.github_client_id self.client_secret = settings.github_client_secret self.redirect_uri = settings.github_redirect_uri self.storage = TokenStorage() logger.info("GitHub OAuth manager initialized", client_id=self.client_id[:10] + "...", redirect_uri=self.redirect_uri) def get_authorization_url(self, state: Optional[str] = None) -> str: """ Generate GitHub authorization URL. This is where we redirect the user to log in to GitHub. Args: state: Optional state parameter for CSRF protection Returns: Authorization URL to redirect user to """ # Build authorization URL params = { "client_id": self.client_id, "redirect_uri": self.redirect_uri, "scope": "repo user", # Request repo and user access } if state: params["state"] = state # Construct URL param_str = "&".join([f"{k}={v}" for k, v in params.items()]) auth_url = f"{self.AUTHORIZE_URL}?{param_str}" logger.info("Generated authorization URL", scope=params["scope"]) return auth_url async def exchange_code_for_token(self, code: str) -> Dict[str, Any]: """ Exchange authorization code for access token. After user approves on GitHub, we get a code. Exchange it for an access token that we can use to call GitHub API. Args: code: Authorization code from GitHub callback Returns: Token data from GitHub Raises: Exception: If token exchange fails """ logger.info("Exchanging authorization code for token") # Prepare request data = { "client_id": self.client_id, "client_secret": self.client_secret, "code": code, "redirect_uri": self.redirect_uri, } headers = { "Accept": "application/json" } # Exchange code for token async with httpx.AsyncClient() as client: response = await client.post( self.TOKEN_URL, data=data, headers=headers ) if response.status_code != 200: logger.error("Token exchange failed", status=response.status_code, response=response.text) raise Exception(f"Failed to exchange code for token: {response.text}") token_data = response.json() # Check for errors if "error" in token_data: logger.error("OAuth error", error=token_data["error"]) raise Exception(f"OAuth error: {token_data['error']}") logger.info("Successfully exchanged code for token", token_type=token_data.get("token_type"), scope=token_data.get("scope")) # Save token self.storage.save_token(token_data) return token_data def get_stored_token(self) -> Optional[str]: """ Get stored access token. Returns: Access token if available, None otherwise """ token_data = self.storage.load_token() if token_data: return token_data.get("access_token") return None def has_valid_token(self) -> bool: """ Check if we have a valid stored token. Returns: True if token exists (we don't check expiration since GitHub tokens don't expire) """ return self.storage.has_token() def logout(self) -> None: """Logout by deleting stored token.""" self.storage.delete_token() logger.info("User logged out") # ============================================================================ # SECTION 3: CONVENIENCE FUNCTIONS # ============================================================================ # Global OAuth manager instance _oauth_manager = None def get_oauth_manager() -> GitHubOAuthManager: """ Get or create the global OAuth manager. Returns: GitHubOAuthManager instance """ global _oauth_manager if _oauth_manager is None: _oauth_manager = GitHubOAuthManager() return _oauth_manager

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kalpalathika/MCP-Enhanced-Data-Retrieval-System'

If you have feedback or need assistance with the MCP directory API, please join our Discord server