Skip to main content
Glama
auth.py•23.6 kB
"""Authentication related functionality for Meta Ads API.""" from typing import Any, Dict, Optional import time import platform import pathlib import os import webbrowser import asyncio import json from .utils import logger import requests # Import from the new callback server module from .callback_server import ( start_callback_server, shutdown_callback_server, token_container, callback_server_port ) # Import the new Pipeboard authentication from .pipeboard_auth import pipeboard_auth_manager # Auth constants # Scope includes pages_show_list and pages_read_engagement to fix issue #16 # where get_account_pages failed for regular users due to missing page permissions AUTH_SCOPE = "business_management,public_profile,pages_show_list,pages_read_engagement" AUTH_REDIRECT_URI = "http://localhost:8888/callback" AUTH_RESPONSE_TYPE = "token" # Log important configuration information logger.info("Authentication module initialized") logger.info(f"Auth scope: {AUTH_SCOPE}") logger.info(f"Default redirect URI: {AUTH_REDIRECT_URI}") # Global flag for authentication state needs_authentication = False # Meta configuration singleton class MetaConfig: _instance = None def __new__(cls): if cls._instance is None: logger.debug("Creating new MetaConfig instance") cls._instance = super(MetaConfig, cls).__new__(cls) cls._instance.app_id = os.environ.get("META_APP_ID", "779761636818489") logger.info(f"MetaConfig initialized with app_id from env/default: {cls._instance.app_id}") return cls._instance def set_app_id(self, app_id): """Set the Meta App ID for API calls""" logger.info(f"Setting Meta App ID: {app_id}") self.app_id = app_id # Also update environment variable for modules that might read directly from it os.environ["META_APP_ID"] = app_id logger.debug(f"Updated META_APP_ID environment variable: {os.environ.get('META_APP_ID')}") def get_app_id(self): """Get the current Meta App ID""" # Check if we have one set if hasattr(self, 'app_id') and self.app_id: logger.debug(f"Using app_id from instance: {self.app_id}") return self.app_id # If not, try environment variable env_app_id = os.environ.get("META_APP_ID", "") if env_app_id: logger.debug(f"Using app_id from environment: {env_app_id}") # Update our instance for future use self.app_id = env_app_id return env_app_id logger.warning("No app_id found in instance or environment variables") return "" def is_configured(self): """Check if the Meta configuration is complete""" app_id = self.get_app_id() configured = bool(app_id) logger.debug(f"MetaConfig.is_configured() = {configured} (app_id: {app_id})") return configured # Create singleton instance meta_config = MetaConfig() class TokenInfo: """Stores token information including expiration""" def __init__(self, access_token: str, expires_in: Optional[int] = None, user_id: Optional[str] = None): self.access_token = access_token self.expires_in = expires_in self.user_id = user_id self.created_at = int(time.time()) logger.debug(f"TokenInfo created. Expires in: {expires_in if expires_in else 'Not specified'}") def is_expired(self) -> bool: """Check if the token is expired""" if not self.expires_in: return False # If no expiration is set, assume it's not expired current_time = int(time.time()) return current_time > (self.created_at + self.expires_in) def serialize(self) -> Dict[str, Any]: """Convert to a dictionary for storage""" return { "access_token": self.access_token, "expires_in": self.expires_in, "user_id": self.user_id, "created_at": self.created_at } @classmethod def deserialize(cls, data: Dict[str, Any]) -> 'TokenInfo': """Create from a stored dictionary""" token = cls( access_token=data.get("access_token", ""), expires_in=data.get("expires_in"), user_id=data.get("user_id") ) token.created_at = data.get("created_at", int(time.time())) return token class AuthManager: """Manages authentication with Meta APIs""" def __init__(self, app_id: str, redirect_uri: str = AUTH_REDIRECT_URI): self.app_id = app_id self.redirect_uri = redirect_uri self.token_info = None # Check for Pipeboard token first self.use_pipeboard = bool(os.environ.get("PIPEBOARD_API_TOKEN", "")) if not self.use_pipeboard: self._load_cached_token() def _get_token_cache_path(self) -> pathlib.Path: """Get the platform-specific path for token cache file""" if platform.system() == "Windows": base_path = pathlib.Path(os.environ.get("APPDATA", "")) elif platform.system() == "Darwin": # macOS base_path = pathlib.Path.home() / "Library" / "Application Support" else: # Assume Linux/Unix base_path = pathlib.Path.home() / ".config" # Create directory if it doesn't exist cache_dir = base_path / "meta-ads-mcp" cache_dir.mkdir(parents=True, exist_ok=True) return cache_dir / "token_cache.json" def _load_cached_token(self) -> bool: """Load token from cache if available""" cache_path = self._get_token_cache_path() if not cache_path.exists(): return False try: with open(cache_path, "r") as f: data = json.load(f) # Validate the cached data structure required_fields = ["access_token", "created_at"] if not all(field in data for field in required_fields): logger.warning("Cached token data is missing required fields") return False # Check if the token looks valid (basic format check) if not data.get("access_token") or len(data["access_token"]) < 20: logger.warning("Cached token appears malformed") return False self.token_info = TokenInfo.deserialize(data) # Check if token is expired if self.token_info.is_expired(): logger.info("Cached token is expired, removing cache file") # Remove the expired cache file try: cache_path.unlink() logger.info(f"Removed expired token cache: {cache_path}") except Exception as e: logger.warning(f"Could not remove expired cache file: {e}") self.token_info = None return False # Additional validation: check if token is too old (more than 60 days) current_time = int(time.time()) if self.token_info.created_at and (current_time - self.token_info.created_at) > (60 * 24 * 3600): logger.warning("Cached token is too old (more than 60 days), removing cache file") try: cache_path.unlink() logger.info(f"Removed old token cache: {cache_path}") except Exception as e: logger.warning(f"Could not remove old cache file: {e}") self.token_info = None return False logger.info(f"Loaded cached token (expires in {(self.token_info.created_at + self.token_info.expires_in) - int(time.time())} seconds)") return True except Exception as e: logger.error(f"Error loading cached token: {e}") # If there's any error reading the cache, try to remove the corrupted file try: cache_path.unlink() logger.info(f"Removed corrupted token cache: {cache_path}") except Exception as cleanup_error: logger.warning(f"Could not remove corrupted cache file: {cleanup_error}") return False def _save_token_to_cache(self) -> None: """Save token to cache file""" if not self.token_info: return cache_path = self._get_token_cache_path() try: with open(cache_path, "w") as f: json.dump(self.token_info.serialize(), f) logger.info(f"Token cached at: {cache_path}") except Exception as e: logger.error(f"Error saving token to cache: {e}") def get_auth_url(self) -> str: """Generate the Facebook OAuth URL for desktop app flow""" return ( f"https://www.facebook.com/v22.0/dialog/oauth?" f"client_id={self.app_id}&" f"redirect_uri={self.redirect_uri}&" f"scope={AUTH_SCOPE}&" f"response_type={AUTH_RESPONSE_TYPE}" ) def authenticate(self, force_refresh: bool = False) -> Optional[str]: """ Authenticate with Meta APIs Args: force_refresh: Force token refresh even if cached token exists Returns: Access token if successful, None otherwise """ # If Pipeboard auth is available, use that instead if self.use_pipeboard: logger.info("Using Pipeboard authentication") return pipeboard_auth_manager.get_access_token(force_refresh=force_refresh) # Otherwise, use the original OAuth flow # Check if we already have a valid token if not force_refresh and self.token_info and not self.token_info.is_expired(): return self.token_info.access_token # Start the callback server if not already running try: port = start_callback_server() # Update redirect URI with the actual port self.redirect_uri = f"http://localhost:{port}/callback" # Generate the auth URL auth_url = self.get_auth_url() # Open browser with auth URL logger.info(f"Opening browser with URL: {auth_url}") webbrowser.open(auth_url) # We don't wait for the token here anymore # The token will be processed by the callback server # Just return None to indicate we've started the flow return None except Exception as e: logger.error(f"Failed to start callback server: {e}") logger.info("Callback server disabled. OAuth authentication flow cannot be used.") return None def get_access_token(self) -> Optional[str]: """ Get the current access token, refreshing if necessary Returns: Access token if available, None otherwise """ # If using Pipeboard, always delegate to the Pipeboard auth manager if self.use_pipeboard: return pipeboard_auth_manager.get_access_token() if not self.token_info or self.token_info.is_expired(): return None return self.token_info.access_token def invalidate_token(self) -> None: """Invalidate the current token, usually because it has expired or is invalid""" # If using Pipeboard, delegate to the Pipeboard auth manager if self.use_pipeboard: pipeboard_auth_manager.invalidate_token() return if self.token_info: logger.info(f"Invalidating token: {self.token_info.access_token[:10]}...") self.token_info = None # Signal that authentication is needed global needs_authentication needs_authentication = True # Remove the cached token file try: cache_path = self._get_token_cache_path() if cache_path.exists(): os.remove(cache_path) logger.info(f"Removed cached token file: {cache_path}") except Exception as e: logger.error(f"Error removing cached token file: {e}") def clear_token(self) -> None: """Alias for invalidate_token for consistency with other APIs""" self.invalidate_token() def process_token_response(token_container): """Process the token response from Facebook.""" global needs_authentication, auth_manager if token_container and token_container.get('token'): logger.info("Processing token response from Facebook OAuth") # Exchange the short-lived token for a long-lived token short_lived_token = token_container['token'] long_lived_token_info = exchange_token_for_long_lived(short_lived_token) if long_lived_token_info: logger.info(f"Successfully exchanged for long-lived token (expires in {long_lived_token_info.expires_in} seconds)") try: auth_manager.token_info = long_lived_token_info logger.info(f"Long-lived token info set in auth_manager, expires in {long_lived_token_info.expires_in} seconds") except NameError: logger.error("auth_manager not defined when trying to process token") try: logger.info("Attempting to save long-lived token to cache") auth_manager._save_token_to_cache() logger.info(f"Long-lived token successfully saved to cache at {auth_manager._get_token_cache_path()}") except Exception as e: logger.error(f"Error saving token to cache: {e}") needs_authentication = False return True else: # Fall back to the short-lived token if exchange fails logger.warning("Failed to exchange for long-lived token, using short-lived token instead") token_info = TokenInfo( access_token=short_lived_token, expires_in=token_container.get('expires_in', 0) ) try: auth_manager.token_info = token_info logger.info(f"Short-lived token info set in auth_manager, expires in {token_info.expires_in} seconds") except NameError: logger.error("auth_manager not defined when trying to process token") try: logger.info("Attempting to save token to cache") auth_manager._save_token_to_cache() logger.info(f"Token successfully saved to cache at {auth_manager._get_token_cache_path()}") except Exception as e: logger.error(f"Error saving token to cache: {e}") needs_authentication = False return True else: logger.warning("Received empty token in process_token_response") needs_authentication = True return False def exchange_token_for_long_lived(short_lived_token): """ Exchange a short-lived token for a long-lived token (60 days validity). Args: short_lived_token: The short-lived access token received from OAuth flow Returns: TokenInfo object with the long-lived token, or None if exchange failed """ logger.info("Attempting to exchange short-lived token for long-lived token") try: # Get the app ID from the configuration app_id = meta_config.get_app_id() # Get the app secret - this should be securely stored app_secret = os.environ.get("META_APP_SECRET", "") if not app_id or not app_secret: logger.error("Missing app_id or app_secret for token exchange") return None # Make the API request to exchange the token url = "https://graph.facebook.com/v22.0/oauth/access_token" params = { "grant_type": "fb_exchange_token", "client_id": app_id, "client_secret": app_secret, "fb_exchange_token": short_lived_token } logger.debug(f"Making token exchange request to {url}") response = requests.get(url, params=params) if response.status_code == 200: data = response.json() logger.debug(f"Token exchange response: {data}") # Create TokenInfo from the response # The response includes access_token and expires_in (in seconds) new_token = data.get("access_token") expires_in = data.get("expires_in") if new_token: logger.info(f"Received long-lived token, expires in {expires_in} seconds (~{expires_in//86400} days)") return TokenInfo( access_token=new_token, expires_in=expires_in ) else: logger.error("No access_token in exchange response") return None else: logger.error(f"Token exchange failed with status {response.status_code}: {response.text}") return None except Exception as e: logger.error(f"Error exchanging token: {e}") return None async def get_current_access_token() -> Optional[str]: """Get the current access token from auth manager""" # Check for environment variable first - this takes highest precedence env_token = os.environ.get("META_ACCESS_TOKEN") if env_token: logger.debug("Using access token from META_ACCESS_TOKEN environment variable") # Basic validation if len(env_token) < 20: # Most Meta tokens are much longer logger.error(f"TOKEN VALIDATION FAILED: Token from environment variable appears malformed (length: {len(env_token)})") return None return env_token # Use the singleton auth manager global auth_manager # Log the function call and current app ID logger.debug("get_current_access_token() called") app_id = meta_config.get_app_id() logger.debug(f"Current app_id: {app_id}") # Check if using Pipeboard authentication using_pipeboard = auth_manager.use_pipeboard # Check if app_id is valid - but only if not using Pipeboard authentication if not app_id and not using_pipeboard: logger.error("TOKEN VALIDATION FAILED: No valid app_id configured") logger.error("Please set META_APP_ID environment variable or configure via meta_config.set_app_id()") return None # Attempt to get access token try: token = auth_manager.get_access_token() if token: # Add basic token validation - check if it looks like a valid token if len(token) < 20: # Most Meta tokens are much longer logger.error(f"TOKEN VALIDATION FAILED: Token appears malformed (length: {len(token)})") auth_manager.invalidate_token() return None logger.debug(f"Access token found in auth_manager (starts with: {token[:10]}...)") return token else: logger.warning("No valid access token available in auth_manager") # Check why token might be missing if hasattr(auth_manager, 'token_info') and auth_manager.token_info: if auth_manager.token_info.is_expired(): logger.error("TOKEN VALIDATION FAILED: Token is expired") # Add expiration details if hasattr(auth_manager.token_info, 'expires_in') and auth_manager.token_info.expires_in: expiry_time = auth_manager.token_info.created_at + auth_manager.token_info.expires_in current_time = int(time.time()) expired_seconds_ago = current_time - expiry_time logger.error(f"Token expired {expired_seconds_ago} seconds ago") elif not auth_manager.token_info.access_token: logger.error("TOKEN VALIDATION FAILED: Token object exists but access_token is empty") else: logger.error("TOKEN VALIDATION FAILED: Token exists but was rejected for unknown reason") else: logger.error("TOKEN VALIDATION FAILED: No token information available") # Suggest next steps for troubleshooting logger.error("To fix: Try re-authenticating or check if your token has been revoked") return None except Exception as e: logger.error(f"Error getting access token: {str(e)}") import traceback logger.error(f"Token validation stacktrace: {traceback.format_exc()}") return None def login(): """ Start the login flow to authenticate with Meta """ print("Starting Meta Ads authentication flow...") try: # Start the callback server first try: port = start_callback_server() except Exception as callback_error: print(f"Error: {callback_error}") print("Callback server is disabled. Please use alternative authentication methods:") print("- Set PIPEBOARD_API_TOKEN environment variable for Pipeboard authentication") print("- Or provide a direct META_ACCESS_TOKEN environment variable") return # Get the auth URL and open the browser auth_url = auth_manager.get_auth_url() print(f"Opening browser with URL: {auth_url}") webbrowser.open(auth_url) # Wait for token to be received print("Waiting for authentication to complete...") max_wait = 300 # 5 minutes wait_interval = 2 # 2 seconds for _ in range(max_wait // wait_interval): if token_container["token"]: token = token_container["token"] print("Authentication successful!") # Verify token works by getting basic user info try: from .api import make_api_request result = asyncio.run(make_api_request("me", token, {})) print(f"Authenticated as: {result.get('name', 'Unknown')} (ID: {result.get('id', 'Unknown')})") return except Exception as e: print(f"Warning: Could not verify token: {e}") return time.sleep(wait_interval) print("Authentication timed out. Please try again.") except Exception as e: print(f"Error during authentication: {e}") print(f"Direct authentication URL: {auth_manager.get_auth_url()}") print("You can manually open this URL in your browser to complete authentication.") # Initialize auth manager with a placeholder - will be updated at runtime META_APP_ID = os.environ.get("META_APP_ID", "YOUR_META_APP_ID") # Create the auth manager auth_manager = AuthManager(META_APP_ID)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pipeboard-co/meta-ads-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server