Skip to main content
Glama
vitalune

Personal Knowledge Assistant

by vitalune
client_manager.py21.3 kB
""" API Client Manager This module provides a centralized manager for all API clients with: - Unified authentication flow for multiple services - Client lifecycle management - Health monitoring and status reporting - Coordinated rate limiting across services - Bulk operations and data synchronization - Configuration management for all clients """ import asyncio import json from datetime import datetime, timezone from typing import Dict, Any, List, Optional, Union from pathlib import Path from dataclasses import dataclass, field import structlog from .base_client import get_client_registry from .gmail_client import GmailClient from .drive_client import DriveClient from .twitter_client import TwitterClient from .linkedin_client import LinkedInClient from ..config.auth import AuthProvider, get_auth_manager from ..config.settings import get_settings from ..utils.rate_limiter import get_rate_limit_manager logger = structlog.get_logger(__name__) @dataclass class ClientConfig: """Configuration for an API client""" client_id: str client_secret: str scopes: List[str] = field(default_factory=list) enabled: bool = True auto_refresh: bool = True custom_settings: Dict[str, Any] = field(default_factory=dict) @dataclass class ServiceStatus: """Status information for a service""" service_name: str authenticated: bool healthy: bool last_check: datetime error_message: Optional[str] = None rate_limit_status: Dict[str, Any] = field(default_factory=dict) metrics: Dict[str, Any] = field(default_factory=dict) class APIClientManager: """Centralized manager for all API clients""" def __init__(self): self.settings = get_settings() self.auth_manager = get_auth_manager() self.rate_limiter = get_rate_limit_manager() self.client_registry = get_client_registry() # Client instances self._clients: Dict[str, Any] = {} self._client_configs: Dict[str, ClientConfig] = {} # Status tracking self._service_status: Dict[str, ServiceStatus] = {} # Background tasks self._health_check_task: Optional[asyncio.Task] = None self._token_refresh_task: Optional[asyncio.Task] = None async def __aenter__(self): """Async context manager entry""" await self.initialize() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit""" await self.shutdown() async def initialize(self): """Initialize the client manager""" logger.info("Initializing API Client Manager") # Start background tasks self._health_check_task = asyncio.create_task(self._health_check_loop()) self._token_refresh_task = asyncio.create_task(self._token_refresh_loop()) logger.info("API Client Manager initialized successfully") async def shutdown(self): """Shutdown the client manager""" logger.info("Shutting down API Client Manager") # Cancel background tasks if self._health_check_task: self._health_check_task.cancel() try: await self._health_check_task except asyncio.CancelledError: pass if self._token_refresh_task: self._token_refresh_task.cancel() try: await self._token_refresh_task except asyncio.CancelledError: pass # Close all clients await self.client_registry.close_all() logger.info("API Client Manager shutdown complete") def configure_service( self, service: str, client_id: str, client_secret: str, scopes: Optional[List[str]] = None, **kwargs ): """Configure a service for use""" config = ClientConfig( client_id=client_id, client_secret=client_secret, scopes=scopes or [], **kwargs ) self._client_configs[service] = config logger.info(f"Configured service: {service}", scopes=scopes) async def create_client(self, service: str) -> Optional[Any]: """Create and register an API client""" if service not in self._client_configs: logger.error(f"Service not configured: {service}") return None config = self._client_configs[service] if not config.enabled: logger.info(f"Service disabled: {service}") return None try: # Create appropriate client if service == "gmail": client = GmailClient( client_id=config.client_id, client_secret=config.client_secret, scopes=config.scopes, **config.custom_settings ) elif service == "drive": client = DriveClient( client_id=config.client_id, client_secret=config.client_secret, scopes=config.scopes, **config.custom_settings ) elif service == "twitter": client = TwitterClient( client_id=config.client_id, client_secret=config.client_secret, scopes=config.scopes, **config.custom_settings ) elif service == "linkedin": client = LinkedInClient( client_id=config.client_id, client_secret=config.client_secret, scopes=config.scopes, **config.custom_settings ) else: logger.error(f"Unknown service: {service}") return None # Register client await self.client_registry.register_client(service, client) self._clients[service] = client # Initialize status tracking self._service_status[service] = ServiceStatus( service_name=service, authenticated=False, healthy=False, last_check=datetime.now(timezone.utc) ) logger.info(f"Created client for service: {service}") return client except Exception as e: logger.error(f"Failed to create client for {service}", error=str(e)) return None async def get_client(self, service: str) -> Optional[Any]: """Get an API client, creating if necessary""" if service in self._clients: return self._clients[service] return await self.create_client(service) async def authenticate_service(self, service: str, redirect_uri: str = None) -> str: """Start authentication flow for a service""" client = await self.get_client(service) if not client: raise ValueError(f"Client for {service} not available") redirect_uri = redirect_uri or "http://localhost:8080/oauth/callback" try: # This will raise an exception with the authorization URL await client.authenticate(redirect_uri) except Exception as e: # Extract auth URL from exception message auth_url = str(e).replace("Please visit this URL to authorize: ", "") logger.info(f"Authentication URL for {service}: {auth_url}") return auth_url async def handle_oauth_callback( self, service: str, authorization_code: str, state: str ) -> bool: """Handle OAuth callback for a service""" client = await self.get_client(service) if not client: return False success = await client.handle_oauth_callback(authorization_code, state) if success: # Update status if service in self._service_status: self._service_status[service].authenticated = True self._service_status[service].healthy = True self._service_status[service].last_check = datetime.now(timezone.utc) return success async def check_authentication(self, service: str) -> bool: """Check if a service is authenticated""" try: # Check if we have valid tokens tokens = await self.auth_manager.list_tokens( provider=AuthProvider(service) ) valid_tokens = [t for t in tokens if t["status"] == "valid"] return len(valid_tokens) > 0 except Exception as e: logger.error(f"Failed to check authentication for {service}", error=str(e)) return False async def refresh_tokens(self, service: Optional[str] = None) -> Dict[str, bool]: """Refresh tokens for one or all services""" results = {} services_to_refresh = [service] if service else self._clients.keys() for svc in services_to_refresh: if svc not in self._clients: continue try: client = self._clients[svc] success = await client.refresh_token() results[svc] = success logger.info(f"Token refresh for {svc}: {'success' if success else 'failed'}") except Exception as e: logger.error(f"Failed to refresh token for {svc}", error=str(e)) results[svc] = False return results async def get_service_status(self, service: Optional[str] = None) -> Dict[str, ServiceStatus]: """Get status for one or all services""" if service: return {service: self._service_status.get(service)} if service in self._service_status else {} return self._service_status.copy() async def get_overall_health(self) -> Dict[str, Any]: """Get overall health status of all services""" total_services = len(self._service_status) healthy_services = sum(1 for status in self._service_status.values() if status.healthy) authenticated_services = sum(1 for status in self._service_status.values() if status.authenticated) # Get rate limit status for all services rate_limit_status = await self.rate_limiter.get_all_status() # Get client health from registry client_health = await self.client_registry.get_all_health_status() return { "timestamp": datetime.now(timezone.utc).isoformat(), "overall_health": "healthy" if healthy_services == total_services else "degraded", "services": { "total": total_services, "healthy": healthy_services, "authenticated": authenticated_services, "unhealthy": total_services - healthy_services }, "rate_limits": rate_limit_status, "client_health": client_health, "service_details": { name: { "authenticated": status.authenticated, "healthy": status.healthy, "last_check": status.last_check.isoformat(), "error_message": status.error_message } for name, status in self._service_status.items() } } async def bulk_export( self, output_directory: Path, services: Optional[List[str]] = None, data_types: Optional[Dict[str, List[str]]] = None, max_items: int = 1000 ) -> Dict[str, int]: """Export data from multiple services""" logger.info(f"Starting bulk export", output_dir=str(output_directory), services=services) output_directory.mkdir(parents=True, exist_ok=True) results = {} services_to_export = services or list(self._clients.keys()) for service in services_to_export: if service not in self._clients: logger.warning(f"Service not available: {service}") continue if not await self.check_authentication(service): logger.warning(f"Service not authenticated: {service}") continue try: client = self._clients[service] service_output_dir = output_directory / service # Get data types for this service service_data_types = None if data_types and service in data_types: service_data_types = data_types[service] # Export data count = await client.export_data( service_output_dir, data_types=service_data_types, max_items=max_items ) results[service] = count logger.info(f"Exported {count} items from {service}") except Exception as e: logger.error(f"Failed to export from {service}", error=str(e)) results[service] = 0 total_exported = sum(results.values()) logger.info(f"Bulk export completed", total_exported=total_exported, results=results) return results async def sync_data( self, source_service: str, target_service: str, data_type: str, max_items: int = 100 ) -> int: """Synchronize data between services""" logger.info(f"Starting data sync", source=source_service, target=target_service, data_type=data_type) source_client = await self.get_client(source_service) target_client = await self.get_client(target_service) if not source_client or not target_client: logger.error("Source or target client not available") return 0 if not await self.check_authentication(source_service): logger.error(f"Source service not authenticated: {source_service}") return 0 if not await self.check_authentication(target_service): logger.error(f"Target service not authenticated: {target_service}") return 0 synced_count = 0 try: # This is a basic example - specific sync logic would depend on the services and data types # For now, we'll just demonstrate the pattern if data_type == "posts" and source_service == "twitter" and target_service == "linkedin": # Example: Sync recent tweets to LinkedIn tweets_result = await source_client.get_home_timeline(max_results=max_items) tweets = tweets_result.get('tweets', []) for tweet in tweets[:10]: # Limit to avoid spam try: # Simple cross-post (would need more sophisticated logic in practice) await target_client.create_post( text=f"From Twitter: {tweet.text[:200]}...", visibility="PUBLIC" ) synced_count += 1 # Add delay to respect rate limits await asyncio.sleep(1) except Exception as e: logger.error(f"Failed to sync tweet", tweet_id=tweet.id, error=str(e)) continue else: logger.warning(f"Sync not implemented for {source_service} -> {target_service} ({data_type})") except Exception as e: logger.error(f"Data sync failed", error=str(e)) logger.info(f"Data sync completed", synced_count=synced_count) return synced_count async def _health_check_loop(self): """Background task for periodic health checks""" while True: try: await asyncio.sleep(300) # Check every 5 minutes for service, client in self._clients.items(): try: # Check client health health = await client.get_health_status() # Update status if service in self._service_status: status = self._service_status[service] status.healthy = health.get("status") != "error" status.last_check = datetime.now(timezone.utc) status.metrics = health.get("metrics", {}) if not status.healthy: status.error_message = health.get("error", "Unknown error") else: status.error_message = None except Exception as e: logger.error(f"Health check failed for {service}", error=str(e)) if service in self._service_status: self._service_status[service].healthy = False self._service_status[service].error_message = str(e) except asyncio.CancelledError: break except Exception as e: logger.error("Health check loop error", error=str(e)) async def _token_refresh_loop(self): """Background task for periodic token refresh""" while True: try: await asyncio.sleep(3600) # Check every hour # Check for tokens that need refresh expired_count = await self.auth_manager.cleanup_expired_tokens() if expired_count > 0: logger.info(f"Cleaned up {expired_count} expired tokens") # Attempt to refresh tokens that are close to expiry rotation_count = await self.auth_manager.rotate_tokens() if rotation_count > 0: logger.info(f"Identified {rotation_count} tokens needing rotation") except asyncio.CancelledError: break except Exception as e: logger.error("Token refresh loop error", error=str(e)) async def save_configuration(self, config_path: Path): """Save client configuration to file""" config_data = { "services": { service: { "client_id": config.client_id, "scopes": config.scopes, "enabled": config.enabled, "auto_refresh": config.auto_refresh, "custom_settings": config.custom_settings # Note: client_secret is not saved for security } for service, config in self._client_configs.items() }, "saved_at": datetime.now(timezone.utc).isoformat() } with open(config_path, 'w') as f: json.dump(config_data, f, indent=2) logger.info(f"Configuration saved to {config_path}") async def load_configuration(self, config_path: Path): """Load client configuration from file""" if not config_path.exists(): logger.warning(f"Configuration file not found: {config_path}") return try: with open(config_path, 'r') as f: config_data = json.load(f) for service, service_config in config_data.get("services", {}).items(): # Client secret will need to be provided separately self._client_configs[service] = ClientConfig( client_id=service_config["client_id"], client_secret="", # Must be set separately scopes=service_config["scopes"], enabled=service_config["enabled"], auto_refresh=service_config["auto_refresh"], custom_settings=service_config["custom_settings"] ) logger.info(f"Configuration loaded from {config_path}") except Exception as e: logger.error(f"Failed to load configuration", config_path=str(config_path), error=str(e)) # Global client manager instance client_manager: Optional[APIClientManager] = None def get_client_manager() -> APIClientManager: """Get the global client manager singleton""" global client_manager if client_manager is None: client_manager = APIClientManager() return client_manager

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vitalune/Nexus-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server