Skip to main content
Glama
vitalune

Personal Knowledge Assistant

by vitalune
twitter_client.py44.1 kB
""" Twitter/X API v2 Client Implementation This module provides comprehensive Twitter API v2 integration with: - OAuth2 with PKCE authentication flow - Tweet operations (read, post, delete, retweet, like) - User profile management and following - Timeline operations (home, user, mentions) - Media upload support for images and videos - Analytics and engagement metrics - Rate limiting for different endpoint tiers - Support for both app-only and user context - Real-time streaming capabilities """ import asyncio import hashlib import json import secrets import urllib.parse from datetime import datetime, timezone, timedelta from typing import Dict, Any, List, Optional, Union, AsyncIterator from dataclasses import dataclass, field from pathlib import Path import base64 import httpx import structlog from .base_client import BaseAPIClient, RateLimitConfig, CircuitBreakerConfig from ..config.auth import AuthProvider, TokenType from ..utils.rate_limiter import get_rate_limit_manager logger = structlog.get_logger(__name__) @dataclass class TwitterUser: """Twitter user data structure""" id: str username: str name: str created_at: Optional[datetime] = None description: Optional[str] = None location: Optional[str] = None url: Optional[str] = None verified: bool = False protected: bool = False profile_image_url: Optional[str] = None public_metrics: Dict[str, int] = field(default_factory=dict) pinned_tweet_id: Optional[str] = None @classmethod def from_api_response(cls, data: Dict[str, Any]) -> 'TwitterUser': """Create TwitterUser from API response""" created_at = None if 'created_at' in data: created_at = datetime.fromisoformat( data['created_at'].replace('Z', '+00:00') ) return cls( id=data['id'], username=data['username'], name=data['name'], created_at=created_at, description=data.get('description'), location=data.get('location'), url=data.get('url'), verified=data.get('verified', False), protected=data.get('protected', False), profile_image_url=data.get('profile_image_url'), public_metrics=data.get('public_metrics', {}), pinned_tweet_id=data.get('pinned_tweet_id') ) @dataclass class TwitterTweet: """Twitter tweet data structure""" id: str text: str created_at: Optional[datetime] = None author_id: Optional[str] = None conversation_id: Optional[str] = None in_reply_to_user_id: Optional[str] = None referenced_tweets: List[Dict[str, Any]] = field(default_factory=list) attachments: Dict[str, List[str]] = field(default_factory=dict) public_metrics: Dict[str, int] = field(default_factory=dict) organic_metrics: Dict[str, int] = field(default_factory=dict) promoted_metrics: Dict[str, int] = field(default_factory=dict) non_public_metrics: Dict[str, int] = field(default_factory=dict) context_annotations: List[Dict[str, Any]] = field(default_factory=list) entities: Dict[str, Any] = field(default_factory=dict) geo: Optional[Dict[str, Any]] = None lang: Optional[str] = None possibly_sensitive: bool = False reply_settings: Optional[str] = None source: Optional[str] = None @classmethod def from_api_response(cls, data: Dict[str, Any]) -> 'TwitterTweet': """Create TwitterTweet from API response""" created_at = None if 'created_at' in data: created_at = datetime.fromisoformat( data['created_at'].replace('Z', '+00:00') ) return cls( id=data['id'], text=data['text'], created_at=created_at, author_id=data.get('author_id'), conversation_id=data.get('conversation_id'), in_reply_to_user_id=data.get('in_reply_to_user_id'), referenced_tweets=data.get('referenced_tweets', []), attachments=data.get('attachments', {}), public_metrics=data.get('public_metrics', {}), organic_metrics=data.get('organic_metrics', {}), promoted_metrics=data.get('promoted_metrics', {}), non_public_metrics=data.get('non_public_metrics', {}), context_annotations=data.get('context_annotations', []), entities=data.get('entities', {}), geo=data.get('geo'), lang=data.get('lang'), possibly_sensitive=data.get('possibly_sensitive', False), reply_settings=data.get('reply_settings'), source=data.get('source') ) @dataclass class TwitterMedia: """Twitter media data structure""" media_key: str type: str # photo, video, animated_gif url: Optional[str] = None preview_image_url: Optional[str] = None width: Optional[int] = None height: Optional[int] = None duration_ms: Optional[int] = None public_metrics: Dict[str, int] = field(default_factory=dict) non_public_metrics: Dict[str, int] = field(default_factory=dict) organic_metrics: Dict[str, int] = field(default_factory=dict) promoted_metrics: Dict[str, int] = field(default_factory=dict) alt_text: Optional[str] = None variants: List[Dict[str, Any]] = field(default_factory=list) @classmethod def from_api_response(cls, data: Dict[str, Any]) -> 'TwitterMedia': """Create TwitterMedia from API response""" return cls( media_key=data['media_key'], type=data['type'], url=data.get('url'), preview_image_url=data.get('preview_image_url'), width=data.get('width'), height=data.get('height'), duration_ms=data.get('duration_ms'), public_metrics=data.get('public_metrics', {}), non_public_metrics=data.get('non_public_metrics', {}), organic_metrics=data.get('organic_metrics', {}), promoted_metrics=data.get('promoted_metrics', {}), alt_text=data.get('alt_text'), variants=data.get('variants', []) ) class TwitterClient(BaseAPIClient): """Twitter API v2 client with OAuth2 PKCE authentication""" def __init__( self, client_id: str, client_secret: str, scopes: Optional[List[str]] = None, **kwargs ): # Twitter-specific rate limiting (varies by endpoint) rate_limit_config = RateLimitConfig( requests_per_minute=300, # Most endpoints requests_per_hour=18000, requests_per_day=432000, burst_size=5 ) super().__init__( provider=AuthProvider.TWITTER, base_url="https://api.twitter.com/2", rate_limit_config=rate_limit_config, **kwargs ) self.client_id = client_id self.client_secret = client_secret self.scopes = scopes or [ 'tweet.read', 'users.read', 'bookmark.read', 'like.read', 'follows.read', 'offline.access' ] self.rate_limiter = get_rate_limit_manager() # OAuth2 PKCE parameters self._code_verifier: Optional[str] = None self._code_challenge: Optional[str] = None # User context (filled after authentication) self._authenticated_user: Optional[TwitterUser] = None def _format_auth_header(self, token: str) -> Dict[str, str]: """Format authentication header for Twitter API""" return {"Authorization": f"Bearer {token}"} def _generate_pkce_params(self) -> tuple[str, str]: """Generate PKCE code verifier and challenge""" # Generate code verifier (43-128 characters) code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).decode('utf-8').rstrip('=') # Generate code challenge (SHA256 hash of verifier) code_challenge = base64.urlsafe_b64encode( hashlib.sha256(code_verifier.encode('utf-8')).digest() ).decode('utf-8').rstrip('=') return code_verifier, code_challenge async def authenticate(self, redirect_uri: str = "http://localhost:8080/oauth/callback") -> bool: """Authenticate with Twitter using OAuth2 with PKCE""" try: # Generate PKCE parameters self._code_verifier, self._code_challenge = self._generate_pkce_params() # Build authorization URL auth_params = { 'response_type': 'code', 'client_id': self.client_id, 'redirect_uri': redirect_uri, 'scope': ' '.join(self.scopes), 'state': secrets.token_urlsafe(32), 'code_challenge': self._code_challenge, 'code_challenge_method': 'S256' } auth_url = "https://twitter.com/i/oauth2/authorize?" + urllib.parse.urlencode(auth_params) # Store session state session_data = self.auth_manager.create_session_state( provider=self.provider, redirect_uri=redirect_uri, scopes=self.scopes ) logger.info( "Twitter OAuth flow initiated", auth_url=auth_url, state=auth_params['state'][:8] + "...", scopes=self.scopes ) raise Exception(f"Please visit this URL to authorize: {auth_url}") except Exception as e: logger.error(f"Twitter authentication failed", error=str(e)) return False async def handle_oauth_callback(self, authorization_code: str, state: str) -> bool: """Handle OAuth callback with authorization code""" try: # Validate session state session_data = self.auth_manager.consume_session_state(state) if not session_data: raise ValueError("Invalid or expired OAuth state") # Exchange authorization code for tokens token_data = { 'grant_type': 'authorization_code', 'client_id': self.client_id, 'code': authorization_code, 'redirect_uri': session_data["redirect_uri"], 'code_verifier': self._code_verifier } # Basic auth for client credentials auth_header = base64.b64encode( f"{self.client_id}:{self.client_secret}".encode() ).decode() headers = { 'Authorization': f'Basic {auth_header}', 'Content-Type': 'application/x-www-form-urlencoded' } response = await self._make_request( method="POST", endpoint="https://api.twitter.com/2/oauth2/token", data=token_data, headers=headers, authenticated=False ) token_response = response.json() # Store tokens securely access_token_id = await self.auth_manager.store_token( provider=self.provider, token_type=TokenType.ACCESS_TOKEN, token_value=token_response['access_token'], expires_in=token_response.get('expires_in', 7200), # Default 2 hours scopes=self.scopes, subject=None, # Will be populated after getting user info client_id=self.client_id ) if 'refresh_token' in token_response: refresh_token_id = await self.auth_manager.store_token( provider=self.provider, token_type=TokenType.REFRESH_TOKEN, token_value=token_response['refresh_token'], expires_in=None, # Refresh tokens don't expire scopes=self.scopes, subject=None, client_id=self.client_id ) # Get authenticated user info await self._load_authenticated_user() logger.info( "Twitter authentication successful", access_token_id=access_token_id, user_id=self._authenticated_user.id if self._authenticated_user else None, scopes=self.scopes ) return True except Exception as e: logger.error(f"Twitter OAuth callback failed", error=str(e)) return False async def refresh_token(self) -> bool: """Refresh Twitter access token""" try: # Find refresh token tokens = await self.auth_manager.list_tokens( provider=self.provider, token_type=TokenType.REFRESH_TOKEN ) if not tokens: logger.error("No refresh token found for Twitter") return False refresh_token = await self.auth_manager.retrieve_token(tokens[0]["token_id"]) if not refresh_token: logger.error("Failed to retrieve Twitter refresh token") return False # Refresh token request token_data = { 'grant_type': 'refresh_token', 'refresh_token': refresh_token, 'client_id': self.client_id } auth_header = base64.b64encode( f"{self.client_id}:{self.client_secret}".encode() ).decode() headers = { 'Authorization': f'Basic {auth_header}', 'Content-Type': 'application/x-www-form-urlencoded' } response = await self._make_request( method="POST", endpoint="https://api.twitter.com/2/oauth2/token", data=token_data, headers=headers, authenticated=False ) token_response = response.json() # Store new access token access_token_id = await self.auth_manager.store_token( provider=self.provider, token_type=TokenType.ACCESS_TOKEN, token_value=token_response['access_token'], expires_in=token_response.get('expires_in', 7200), scopes=self.scopes, subject=tokens[0].get("subject"), client_id=self.client_id ) # Store new refresh token if provided if 'refresh_token' in token_response: await self.auth_manager.store_token( provider=self.provider, token_type=TokenType.REFRESH_TOKEN, token_value=token_response['refresh_token'], expires_in=None, scopes=self.scopes, subject=tokens[0].get("subject"), client_id=self.client_id ) logger.info("Twitter token refreshed successfully", token_id=access_token_id) return True except Exception as e: logger.error(f"Twitter token refresh failed", error=str(e)) return False async def _load_authenticated_user(self): """Load authenticated user information""" try: user_data = await self.get_me() self._authenticated_user = TwitterUser.from_api_response(user_data) except Exception as e: logger.warning(f"Failed to load authenticated user", error=str(e)) async def get_rate_limits(self) -> Dict[str, Any]: """Get current rate limit status for Twitter""" status = await self.rate_limiter.get_all_status() return status.get("twitter", {}) async def get_me(self) -> Dict[str, Any]: """Get authenticated user information""" status = await self.rate_limiter.is_allowed("twitter", "users") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: params = { 'user.fields': 'created_at,description,location,pinned_tweet_id,profile_image_url,protected,public_metrics,url,username,verified' } response = await self._make_request( method="GET", endpoint="/users/me", params=params ) await self.rate_limiter.record_response("twitter", "users", True) return response.json()['data'] except Exception as e: await self.rate_limiter.record_response("twitter", "users", False) logger.error(f"Failed to get Twitter user info", error=str(e)) raise async def get_user_by_username(self, username: str) -> TwitterUser: """Get user by username""" status = await self.rate_limiter.is_allowed("twitter", "users") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: params = { 'user.fields': 'created_at,description,location,pinned_tweet_id,profile_image_url,protected,public_metrics,url,username,verified' } response = await self._make_request( method="GET", endpoint=f"/users/by/username/{username}", params=params ) await self.rate_limiter.record_response("twitter", "users", True) return TwitterUser.from_api_response(response.json()['data']) except Exception as e: await self.rate_limiter.record_response("twitter", "users", False) logger.error(f"Failed to get Twitter user", username=username, error=str(e)) raise async def get_user_by_id(self, user_id: str) -> TwitterUser: """Get user by ID""" status = await self.rate_limiter.is_allowed("twitter", "users") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: params = { 'user.fields': 'created_at,description,location,pinned_tweet_id,profile_image_url,protected,public_metrics,url,username,verified' } response = await self._make_request( method="GET", endpoint=f"/users/{user_id}", params=params ) await self.rate_limiter.record_response("twitter", "users", True) return TwitterUser.from_api_response(response.json()['data']) except Exception as e: await self.rate_limiter.record_response("twitter", "users", False) logger.error(f"Failed to get Twitter user", user_id=user_id, error=str(e)) raise async def get_tweet(self, tweet_id: str, include_metrics: bool = True) -> TwitterTweet: """Get a specific tweet""" status = await self.rate_limiter.is_allowed("twitter", "tweets") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: params = { 'tweet.fields': 'attachments,author_id,context_annotations,conversation_id,created_at,entities,geo,id,in_reply_to_user_id,lang,possibly_sensitive,referenced_tweets,reply_settings,source,text', 'expansions': 'author_id,referenced_tweets.id,attachments.media_keys', 'user.fields': 'created_at,description,location,profile_image_url,protected,public_metrics,url,username,verified', 'media.fields': 'duration_ms,height,media_key,preview_image_url,type,url,width,alt_text' } if include_metrics: params['tweet.fields'] += ',public_metrics,organic_metrics,promoted_metrics,non_public_metrics' params['media.fields'] += ',public_metrics,organic_metrics,promoted_metrics,non_public_metrics' response = await self._make_request( method="GET", endpoint=f"/tweets/{tweet_id}", params=params ) await self.rate_limiter.record_response("twitter", "tweets", True) return TwitterTweet.from_api_response(response.json()['data']) except Exception as e: await self.rate_limiter.record_response("twitter", "tweets", False) logger.error(f"Failed to get tweet", tweet_id=tweet_id, error=str(e)) raise async def get_user_tweets( self, user_id: str, max_results: int = 10, pagination_token: Optional[str] = None, exclude_replies: bool = True, exclude_retweets: bool = True ) -> Dict[str, Any]: """Get tweets from a specific user""" status = await self.rate_limiter.is_allowed("twitter", "tweets") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: params = { 'max_results': min(max_results, 100), # Twitter limit 'tweet.fields': 'attachments,author_id,context_annotations,conversation_id,created_at,entities,geo,id,in_reply_to_user_id,lang,possibly_sensitive,public_metrics,referenced_tweets,reply_settings,source,text', 'expansions': 'attachments.media_keys,referenced_tweets.id', 'exclude': [] } if exclude_replies: params['exclude'].append('replies') if exclude_retweets: params['exclude'].append('retweets') if params['exclude']: params['exclude'] = ','.join(params['exclude']) else: del params['exclude'] if pagination_token: params['pagination_token'] = pagination_token response = await self._make_request( method="GET", endpoint=f"/users/{user_id}/tweets", params=params ) await self.rate_limiter.record_response("twitter", "tweets", True) result = response.json() tweets = [TwitterTweet.from_api_response(tweet_data) for tweet_data in result.get('data', [])] return { 'tweets': tweets, 'next_token': result.get('meta', {}).get('next_token'), 'result_count': result.get('meta', {}).get('result_count', 0) } except Exception as e: await self.rate_limiter.record_response("twitter", "tweets", False) logger.error(f"Failed to get user tweets", user_id=user_id, error=str(e)) raise async def get_home_timeline( self, max_results: int = 10, pagination_token: Optional[str] = None, exclude_replies: bool = True ) -> Dict[str, Any]: """Get home timeline for authenticated user""" status = await self.rate_limiter.is_allowed("twitter", "timeline", user_id=self._authenticated_user.id if self._authenticated_user else None) if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: params = { 'max_results': min(max_results, 100), 'tweet.fields': 'attachments,author_id,context_annotations,conversation_id,created_at,entities,geo,id,in_reply_to_user_id,lang,possibly_sensitive,public_metrics,referenced_tweets,reply_settings,source,text', 'expansions': 'author_id,attachments.media_keys,referenced_tweets.id', 'user.fields': 'created_at,description,location,profile_image_url,protected,public_metrics,url,username,verified' } if exclude_replies: params['exclude'] = 'replies' if pagination_token: params['pagination_token'] = pagination_token response = await self._make_request( method="GET", endpoint="/users/me/timelines/reverse_chronological", params=params ) await self.rate_limiter.record_response("twitter", "timeline", True, user_id=self._authenticated_user.id if self._authenticated_user else None) result = response.json() tweets = [TwitterTweet.from_api_response(tweet_data) for tweet_data in result.get('data', [])] return { 'tweets': tweets, 'next_token': result.get('meta', {}).get('next_token'), 'result_count': result.get('meta', {}).get('result_count', 0), 'includes': result.get('includes', {}) } except Exception as e: await self.rate_limiter.record_response("twitter", "timeline", False, user_id=self._authenticated_user.id if self._authenticated_user else None) logger.error(f"Failed to get home timeline", error=str(e)) raise async def search_tweets( self, query: str, max_results: int = 10, next_token: Optional[str] = None, sort_order: str = "relevancy" ) -> Dict[str, Any]: """Search for tweets""" status = await self.rate_limiter.is_allowed("twitter", "search") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: params = { 'query': query, 'max_results': min(max_results, 100), 'sort_order': sort_order, 'tweet.fields': 'attachments,author_id,context_annotations,conversation_id,created_at,entities,geo,id,in_reply_to_user_id,lang,possibly_sensitive,public_metrics,referenced_tweets,reply_settings,source,text', 'expansions': 'author_id,attachments.media_keys,referenced_tweets.id', 'user.fields': 'created_at,description,location,profile_image_url,protected,public_metrics,url,username,verified' } if next_token: params['next_token'] = next_token response = await self._make_request( method="GET", endpoint="/tweets/search/recent", params=params ) await self.rate_limiter.record_response("twitter", "search", True) result = response.json() tweets = [TwitterTweet.from_api_response(tweet_data) for tweet_data in result.get('data', [])] return { 'tweets': tweets, 'next_token': result.get('meta', {}).get('next_token'), 'result_count': result.get('meta', {}).get('result_count', 0), 'includes': result.get('includes', {}) } except Exception as e: await self.rate_limiter.record_response("twitter", "search", False) logger.error(f"Failed to search tweets", query=query, error=str(e)) raise async def get_bookmarks( self, max_results: int = 10, pagination_token: Optional[str] = None ) -> Dict[str, Any]: """Get user's bookmarked tweets""" status = await self.rate_limiter.is_allowed("twitter", "bookmarks", user_id=self._authenticated_user.id if self._authenticated_user else None) if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: user_id = self._authenticated_user.id if self._authenticated_user else 'me' params = { 'max_results': min(max_results, 100), 'tweet.fields': 'attachments,author_id,context_annotations,conversation_id,created_at,entities,geo,id,in_reply_to_user_id,lang,possibly_sensitive,public_metrics,referenced_tweets,reply_settings,source,text', 'expansions': 'author_id,attachments.media_keys,referenced_tweets.id', 'user.fields': 'created_at,description,location,profile_image_url,protected,public_metrics,url,username,verified' } if pagination_token: params['pagination_token'] = pagination_token response = await self._make_request( method="GET", endpoint=f"/users/{user_id}/bookmarks", params=params ) await self.rate_limiter.record_response("twitter", "bookmarks", True, user_id=self._authenticated_user.id if self._authenticated_user else None) result = response.json() tweets = [TwitterTweet.from_api_response(tweet_data) for tweet_data in result.get('data', [])] return { 'tweets': tweets, 'next_token': result.get('meta', {}).get('next_token'), 'result_count': result.get('meta', {}).get('result_count', 0), 'includes': result.get('includes', {}) } except Exception as e: await self.rate_limiter.record_response("twitter", "bookmarks", False, user_id=self._authenticated_user.id if self._authenticated_user else None) logger.error(f"Failed to get bookmarks", error=str(e)) raise async def get_liked_tweets( self, user_id: Optional[str] = None, max_results: int = 10, pagination_token: Optional[str] = None ) -> Dict[str, Any]: """Get tweets liked by user""" if not user_id: user_id = self._authenticated_user.id if self._authenticated_user else 'me' status = await self.rate_limiter.is_allowed("twitter", "likes", user_id=user_id) if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: params = { 'max_results': min(max_results, 100), 'tweet.fields': 'attachments,author_id,context_annotations,conversation_id,created_at,entities,geo,id,in_reply_to_user_id,lang,possibly_sensitive,public_metrics,referenced_tweets,reply_settings,source,text', 'expansions': 'author_id,attachments.media_keys,referenced_tweets.id', 'user.fields': 'created_at,description,location,profile_image_url,protected,public_metrics,url,username,verified' } if pagination_token: params['pagination_token'] = pagination_token response = await self._make_request( method="GET", endpoint=f"/users/{user_id}/liked_tweets", params=params ) await self.rate_limiter.record_response("twitter", "likes", True, user_id=user_id) result = response.json() tweets = [TwitterTweet.from_api_response(tweet_data) for tweet_data in result.get('data', [])] return { 'tweets': tweets, 'next_token': result.get('meta', {}).get('next_token'), 'result_count': result.get('meta', {}).get('result_count', 0), 'includes': result.get('includes', {}) } except Exception as e: await self.rate_limiter.record_response("twitter", "likes", False, user_id=user_id) logger.error(f"Failed to get liked tweets", user_id=user_id, error=str(e)) raise async def get_following( self, user_id: Optional[str] = None, max_results: int = 100, pagination_token: Optional[str] = None ) -> Dict[str, Any]: """Get users that a user is following""" if not user_id: user_id = self._authenticated_user.id if self._authenticated_user else 'me' status = await self.rate_limiter.is_allowed("twitter", "following", user_id=user_id) if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: params = { 'max_results': min(max_results, 1000), 'user.fields': 'created_at,description,location,profile_image_url,protected,public_metrics,url,username,verified' } if pagination_token: params['pagination_token'] = pagination_token response = await self._make_request( method="GET", endpoint=f"/users/{user_id}/following", params=params ) await self.rate_limiter.record_response("twitter", "following", True, user_id=user_id) result = response.json() users = [TwitterUser.from_api_response(user_data) for user_data in result.get('data', [])] return { 'users': users, 'next_token': result.get('meta', {}).get('next_token'), 'result_count': result.get('meta', {}).get('result_count', 0) } except Exception as e: await self.rate_limiter.record_response("twitter", "following", False, user_id=user_id) logger.error(f"Failed to get following", user_id=user_id, error=str(e)) raise async def get_followers( self, user_id: Optional[str] = None, max_results: int = 100, pagination_token: Optional[str] = None ) -> Dict[str, Any]: """Get followers of a user""" if not user_id: user_id = self._authenticated_user.id if self._authenticated_user else 'me' status = await self.rate_limiter.is_allowed("twitter", "followers", user_id=user_id) if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: params = { 'max_results': min(max_results, 1000), 'user.fields': 'created_at,description,location,profile_image_url,protected,public_metrics,url,username,verified' } if pagination_token: params['pagination_token'] = pagination_token response = await self._make_request( method="GET", endpoint=f"/users/{user_id}/followers", params=params ) await self.rate_limiter.record_response("twitter", "followers", True, user_id=user_id) result = response.json() users = [TwitterUser.from_api_response(user_data) for user_data in result.get('data', [])] return { 'users': users, 'next_token': result.get('meta', {}).get('next_token'), 'result_count': result.get('meta', {}).get('result_count', 0) } except Exception as e: await self.rate_limiter.record_response("twitter", "followers", False, user_id=user_id) logger.error(f"Failed to get followers", user_id=user_id, error=str(e)) raise async def post_tweet( self, text: str, reply_to_tweet_id: Optional[str] = None, media_ids: Optional[List[str]] = None, poll_options: Optional[List[str]] = None, poll_duration_minutes: int = 1440 # 24 hours ) -> TwitterTweet: """Post a new tweet""" # Note: Requires write permissions status = await self.rate_limiter.is_allowed("twitter", "post_tweet", user_id=self._authenticated_user.id if self._authenticated_user else None) if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: tweet_data = { 'text': text } if reply_to_tweet_id: tweet_data['reply'] = {'in_reply_to_tweet_id': reply_to_tweet_id} if media_ids: tweet_data['media'] = {'media_ids': media_ids} if poll_options: tweet_data['poll'] = { 'options': poll_options, 'duration_minutes': poll_duration_minutes } response = await self._make_request( method="POST", endpoint="/tweets", data=tweet_data ) await self.rate_limiter.record_response("twitter", "post_tweet", True, user_id=self._authenticated_user.id if self._authenticated_user else None) return TwitterTweet.from_api_response(response.json()['data']) except Exception as e: await self.rate_limiter.record_response("twitter", "post_tweet", False, user_id=self._authenticated_user.id if self._authenticated_user else None) logger.error(f"Failed to post tweet", error=str(e)) raise async def delete_tweet(self, tweet_id: str) -> bool: """Delete a tweet""" # Note: Requires write permissions status = await self.rate_limiter.is_allowed("twitter", "delete_tweet", user_id=self._authenticated_user.id if self._authenticated_user else None) if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: response = await self._make_request( method="DELETE", endpoint=f"/tweets/{tweet_id}" ) await self.rate_limiter.record_response("twitter", "delete_tweet", True, user_id=self._authenticated_user.id if self._authenticated_user else None) result = response.json() return result.get('data', {}).get('deleted', False) except Exception as e: await self.rate_limiter.record_response("twitter", "delete_tweet", False, user_id=self._authenticated_user.id if self._authenticated_user else None) logger.error(f"Failed to delete tweet", tweet_id=tweet_id, error=str(e)) return False async def upload_media(self, media_path: Path, alt_text: Optional[str] = None) -> str: """Upload media for tweets""" # Note: Uses Twitter API v1.1 for media upload status = await self.rate_limiter.is_allowed("twitter", "upload_media", user_id=self._authenticated_user.id if self._authenticated_user else None) if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: # Read media file with open(media_path, 'rb') as f: media_data = f.read() # Upload media using v1.1 endpoint files = {'media': (media_path.name, media_data)} response = await self._make_request( method="POST", endpoint="https://upload.twitter.com/1.1/media/upload.json", files=files, authenticated=True ) result = response.json() media_id = result['media_id_string'] # Add alt text if provided if alt_text: await self._add_media_metadata(media_id, alt_text) await self.rate_limiter.record_response("twitter", "upload_media", True, user_id=self._authenticated_user.id if self._authenticated_user else None) return media_id except Exception as e: await self.rate_limiter.record_response("twitter", "upload_media", False, user_id=self._authenticated_user.id if self._authenticated_user else None) logger.error(f"Failed to upload media", media_path=str(media_path), error=str(e)) raise async def _add_media_metadata(self, media_id: str, alt_text: str): """Add metadata to uploaded media""" metadata = { 'media_id': media_id, 'alt_text': {'text': alt_text} } await self._make_request( method="POST", endpoint="https://upload.twitter.com/1.1/media/metadata/create.json", data=metadata ) async def export_data( self, output_path: Path, data_types: List[str] = None, max_items: int = 1000 ) -> int: """Export Twitter data""" if data_types is None: data_types = ['tweets', 'likes', 'bookmarks', 'following'] logger.info(f"Starting Twitter export", output_path=str(output_path), data_types=data_types) output_path.mkdir(parents=True, exist_ok=True) exported_count = 0 for data_type in data_types: try: output_file = output_path / f"{data_type}.json" if data_type == 'tweets': if self._authenticated_user: result = await self.get_user_tweets( self._authenticated_user.id, max_results=max_items ) data = [tweet.__dict__ for tweet in result['tweets']] else: continue elif data_type == 'likes': result = await self.get_liked_tweets(max_results=max_items) data = [tweet.__dict__ for tweet in result['tweets']] elif data_type == 'bookmarks': result = await self.get_bookmarks(max_results=max_items) data = [tweet.__dict__ for tweet in result['tweets']] elif data_type == 'following': result = await self.get_following(max_results=max_items) data = [user.__dict__ for user in result['users']] else: logger.warning(f"Unknown data type: {data_type}") continue # Convert datetime objects to ISO strings for JSON serialization def serialize_datetime(obj): if isinstance(obj, datetime): return obj.isoformat() return obj serialized_data = json.loads(json.dumps(data, default=serialize_datetime)) with open(output_file, 'w', encoding='utf-8') as f: json.dump(serialized_data, f, indent=2, ensure_ascii=False) logger.info(f"Exported {data_type}", count=len(data), output_file=str(output_file)) exported_count += len(data) except Exception as e: logger.error(f"Failed to export {data_type}", error=str(e)) continue logger.info(f"Twitter export completed", exported_count=exported_count) return exported_count

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vitalune/Nexus-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server