Skip to main content
Glama
auth.py6.31 kB
"""OAuth2 authentication for Withings API.""" import os from typing import Optional from pathlib import Path import httpx from datetime import datetime, timedelta class WithingsAuth: """Handles OAuth2 authentication for Withings API.""" BASE_URL = "https://wbsapi.withings.net" AUTH_URL = "https://account.withings.com/oauth2_user/authorize2" TOKEN_URL = f"{BASE_URL}/v2/oauth2" def __init__(self, env_file: Optional[str] = None): self.env_file = Path(env_file) if env_file else self._find_env_file() self.client_id = os.getenv("WITHINGS_CLIENT_ID") self.client_secret = os.getenv("WITHINGS_CLIENT_SECRET") self.redirect_uri = os.getenv("WITHINGS_REDIRECT_URI", "http://localhost:8080/callback") self.access_token: Optional[str] = None self.refresh_token: Optional[str] = None self.token_expires_at: Optional[datetime] = None # Load tokens from environment if available self.access_token = os.getenv("WITHINGS_ACCESS_TOKEN") self.refresh_token = os.getenv("WITHINGS_REFRESH_TOKEN") def _find_env_file(self) -> Path: """Find .env file in current directory or parent directories.""" current = Path.cwd() while current != current.parent: env_path = current / ".env" if env_path.exists(): return env_path current = current.parent # If not found, return .env in current directory return Path.cwd() / ".env" def get_authorization_url(self, scope: str = "user.info,user.metrics,user.activity") -> str: """Generate OAuth2 authorization URL.""" params = { "response_type": "code", "client_id": self.client_id, "redirect_uri": self.redirect_uri, "scope": scope, "state": "random_state_string" } query = "&".join(f"{k}={v}" for k, v in params.items()) return f"{self.AUTH_URL}?{query}" async def exchange_code_for_token(self, code: str, save_to_env: bool = True) -> dict: """Exchange authorization code for access token.""" async with httpx.AsyncClient() as client: response = await client.post( self.TOKEN_URL, data={ "action": "requesttoken", "grant_type": "authorization_code", "client_id": self.client_id, "client_secret": self.client_secret, "code": code, "redirect_uri": self.redirect_uri, } ) response.raise_for_status() data = response.json() if data.get("status") == 0: body = data["body"] self.access_token = body["access_token"] self.refresh_token = body["refresh_token"] expires_in = body.get("expires_in", 3600) self.token_expires_at = datetime.now() + timedelta(seconds=expires_in) # Automatically save tokens to .env if save_to_env: self._save_tokens_to_env() return body else: raise Exception(f"Token exchange failed: {data}") async def refresh_access_token(self, save_to_env: bool = True) -> dict: """Refresh the access token using refresh token.""" if not self.refresh_token: raise Exception("No refresh token available") async with httpx.AsyncClient() as client: response = await client.post( self.TOKEN_URL, data={ "action": "requesttoken", "grant_type": "refresh_token", "client_id": self.client_id, "client_secret": self.client_secret, "refresh_token": self.refresh_token, } ) response.raise_for_status() data = response.json() if data.get("status") == 0: body = data["body"] self.access_token = body["access_token"] self.refresh_token = body["refresh_token"] expires_in = body.get("expires_in", 3600) self.token_expires_at = datetime.now() + timedelta(seconds=expires_in) # Automatically save refreshed tokens to .env if save_to_env: self._save_tokens_to_env() return body else: raise Exception(f"Token refresh failed: {data}") async def ensure_valid_token(self): """Ensure we have a valid access token, refreshing if necessary.""" if not self.access_token: raise Exception("No access token available. Please authenticate first.") # Refresh if token expires in less than 5 minutes if self.token_expires_at and datetime.now() >= self.token_expires_at - timedelta(minutes=5): await self.refresh_access_token() def get_headers(self) -> dict: """Get authorization headers for API requests.""" if not self.access_token: raise Exception("No access token available") return {"Authorization": f"Bearer {self.access_token}"} def _save_tokens_to_env(self): """Save tokens to .env file.""" if not self.access_token or not self.refresh_token: return # Read existing .env content env_content = {} if self.env_file.exists(): with open(self.env_file, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) env_content[key.strip()] = value.strip() # Update tokens env_content['WITHINGS_ACCESS_TOKEN'] = self.access_token env_content['WITHINGS_REFRESH_TOKEN'] = self.refresh_token # Write back to file with open(self.env_file, 'w') as f: for key, value in env_content.items(): f.write(f"{key}={value}\n") async def save_tokens(self): """Save current tokens to .env file.""" self._save_tokens_to_env()

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/schimmmi/withings-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server