mcp-twikit

from fastmcp import FastMCP, Context import twikit import os from pathlib import Path import logging from typing import Optional, List import time # Create an MCP server mcp = FastMCP("mcp-twikit") logger = logging.getLogger(__name__) httpx_logger = logging.getLogger("httpx") httpx_logger.setLevel(logging.WARNING) USERNAME = os.getenv('TWITTER_USERNAME') EMAIL = os.getenv('TWITTER_EMAIL') PASSWORD = os.getenv('TWITTER_PASSWORD') USER_AGENT = os.getenv('USER_AGENT') COOKIES_PATH = Path.home() / '.mcp-twikit' / 'cookies.json' # Rate limit tracking RATE_LIMITS = {} RATE_LIMIT_WINDOW = 15 * 60 # 15 minutes in seconds async def get_twitter_client() -> twikit.Client: """Initialize and return an authenticated Twitter client.""" client = twikit.Client('en-US', user_agent=USER_AGENT) if COOKIES_PATH.exists(): client.load_cookies(COOKIES_PATH) else: try: await client.login( auth_info_1=USERNAME, auth_info_2=EMAIL, password=PASSWORD ) except Exception as e: logger.error(f"Failed to login: {e}") raise COOKIES_PATH.parent.mkdir(parents=True, exist_ok=True) client.save_cookies(COOKIES_PATH) return client def check_rate_limit(endpoint: str) -> bool: """Check if we're within rate limits for a given endpoint.""" now = time.time() if endpoint not in RATE_LIMITS: RATE_LIMITS[endpoint] = [] # Remove old timestamps RATE_LIMITS[endpoint] = [t for t in RATE_LIMITS[endpoint] if now - t < RATE_LIMIT_WINDOW] # Check limits based on endpoint if endpoint == 'tweet': return len(RATE_LIMITS[endpoint]) < 300 # 300 tweets per 15 minutes elif endpoint == 'dm': return len(RATE_LIMITS[endpoint]) < 1000 # 1000 DMs per 15 minutes return True # Existing search and read tools @mcp.tool() async def search_twitter(query: str, sort_by: str = 'Top', count: int = 10, ctx: Context = None) -> str: """Search twitter with a query. Sort by 'Top' or 'Latest'""" try: client = await get_twitter_client() tweets = await client.search_tweet(query, product=sort_by, count=count) return convert_tweets_to_markdown(tweets) except Exception as e: logger.error(f"Failed to search tweets: {e}") return f"Failed to search tweets: {e}" @mcp.tool() async def get_user_tweets(username: str, tweet_type: str = 'Tweets', count: int = 10, ctx: Context = None) -> str: """Get tweets from a specific user's timeline.""" try: client = await get_twitter_client() username = username.lstrip('@') user = await client.get_user_by_screen_name(username) if not user: return f"Could not find user {username}" tweets = await client.get_user_tweets( user_id=user.id, tweet_type=tweet_type, count=count ) return convert_tweets_to_markdown(tweets) except Exception as e: logger.error(f"Failed to get user tweets: {e}") return f"Failed to get user tweets: {e}" @mcp.tool() async def get_timeline(count: int = 20) -> str: """Get tweets from your home timeline (For You).""" try: client = await get_twitter_client() tweets = await client.get_timeline(count=count) return convert_tweets_to_markdown(tweets) except Exception as e: logger.error(f"Failed to get timeline: {e}") return f"Failed to get timeline: {e}" @mcp.tool() async def get_latest_timeline(count: int = 20) -> str: """Get tweets from your home timeline (Following).""" try: client = await get_twitter_client() tweets = await client.get_latest_timeline(count=count) return convert_tweets_to_markdown(tweets) except Exception as e: logger.error(f"Failed to get latest timeline: {e}") return f"Failed to get latest timeline: {e}" # New write tools @mcp.tool() async def post_tweet( text: str, media_paths: Optional[List[str]] = None, reply_to: Optional[str] = None, tags: Optional[List[str]] = None ) -> str: """Post a tweet with optional media, reply, and tags.""" try: if not check_rate_limit('tweet'): return "Rate limit exceeded for tweets. Please wait before posting again." client = await get_twitter_client() # Handle tags by converting to mentions if tags: mentions = ' '.join(f"@{tag.lstrip('@')}" for tag in tags) text = f"""{text} {mentions}""" # Upload media if provided media_ids = [] if media_paths: for path in media_paths: media_id = await client.upload_media(path, wait_for_completion=True) media_ids.append(media_id) # Create the tweet tweet = await client.create_tweet( text=text, media_ids=media_ids if media_ids else None, reply_to=reply_to ) RATE_LIMITS.setdefault('tweet', []).append(time.time()) return f"Successfully posted tweet: {tweet.id}" except Exception as e: logger.error(f"Failed to post tweet: {e}") return f"Failed to post tweet: {e}" @mcp.tool() async def delete_tweet(tweet_id: str) -> str: """Delete a tweet by its ID.""" try: client = await get_twitter_client() await client.delete_tweet(tweet_id) return f"Successfully deleted tweet {tweet_id}" except Exception as e: logger.error(f"Failed to delete tweet: {e}") return f"Failed to delete tweet: {e}" @mcp.tool() async def send_dm(user_id: str, message: str, media_path: Optional[str] = None) -> str: """Send a direct message to a user.""" try: if not check_rate_limit('dm'): return "Rate limit exceeded for DMs. Please wait before sending again." client = await get_twitter_client() media_id = None if media_path: media_id = await client.upload_media(media_path, wait_for_completion=True) await client.send_dm( user_id=user_id, text=message, media_id=media_id ) RATE_LIMITS.setdefault('dm', []).append(time.time()) return f"Successfully sent DM to user {user_id}" except Exception as e: logger.error(f"Failed to send DM: {e}") return f"Failed to send DM: {e}" @mcp.tool() async def delete_dm(message_id: str) -> str: """Delete a direct message by its ID.""" try: client = await get_twitter_client() await client.delete_dm(message_id) return f"Successfully deleted DM {message_id}" except Exception as e: logger.error(f"Failed to delete DM: {e}") return f"Failed to delete DM: {e}" def convert_tweets_to_markdown(tweets) -> str: """Convert a list of tweets to markdown format.""" result = [] for tweet in tweets: result.append(f"### @{tweet.user.screen_name}") result.append(f"**{tweet.created_at}**") result.append(tweet.text) if tweet.media: for media in tweet.media: result.append(f"![media]({media.url})") result.append("---") return "\n".join(result)