We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/Zo-Valentine/mcp-twikit'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
from fastmcp import FastMCP, Context
import twikit
import os
from pathlib import Path
import logging
from typing import Optional, List
import time
# Create an MCP server
mcp = FastMCP("mcp-twikit")
logger = logging.getLogger(__name__)
httpx_logger = logging.getLogger("httpx")
httpx_logger.setLevel(logging.WARNING)
USERNAME = os.getenv('TWITTER_USERNAME')
EMAIL = os.getenv('TWITTER_EMAIL')
PASSWORD = os.getenv('TWITTER_PASSWORD')
USER_AGENT = os.getenv('USER_AGENT')
COOKIES_PATH = Path.home() / '.mcp-twikit' / 'cookies.json'
# Rate limit tracking
RATE_LIMITS = {}
RATE_LIMIT_WINDOW = 15 * 60 # 15 minutes in seconds
async def get_twitter_client() -> twikit.Client:
"""Initialize and return an authenticated Twitter client."""
client = twikit.Client('en-US', user_agent=USER_AGENT)
if COOKIES_PATH.exists():
client.load_cookies(COOKIES_PATH)
else:
try:
await client.login(
auth_info_1=USERNAME,
auth_info_2=EMAIL,
password=PASSWORD
)
except Exception as e:
logger.error(f"Failed to login: {e}")
raise
COOKIES_PATH.parent.mkdir(parents=True, exist_ok=True)
client.save_cookies(COOKIES_PATH)
return client
def check_rate_limit(endpoint: str) -> bool:
"""Check if we're within rate limits for a given endpoint."""
now = time.time()
if endpoint not in RATE_LIMITS:
RATE_LIMITS[endpoint] = []
# Remove old timestamps
RATE_LIMITS[endpoint] = [t for t in RATE_LIMITS[endpoint] if now - t < RATE_LIMIT_WINDOW]
# Check limits based on endpoint
if endpoint == 'tweet':
return len(RATE_LIMITS[endpoint]) < 300 # 300 tweets per 15 minutes
elif endpoint == 'dm':
return len(RATE_LIMITS[endpoint]) < 1000 # 1000 DMs per 15 minutes
return True
# Existing search and read tools
@mcp.tool()
async def search_twitter(query: str, sort_by: str = 'Top', count: int = 10, ctx: Context = None) -> str:
"""Search twitter with a query. Sort by 'Top' or 'Latest'"""
try:
client = await get_twitter_client()
tweets = await client.search_tweet(query, product=sort_by, count=count)
return convert_tweets_to_markdown(tweets)
except Exception as e:
logger.error(f"Failed to search tweets: {e}")
return f"Failed to search tweets: {e}"
@mcp.tool()
async def get_user_tweets(username: str, tweet_type: str = 'Tweets', count: int = 10, ctx: Context = None) -> str:
"""Get tweets from a specific user's timeline."""
try:
client = await get_twitter_client()
username = username.lstrip('@')
user = await client.get_user_by_screen_name(username)
if not user:
return f"Could not find user {username}"
tweets = await client.get_user_tweets(
user_id=user.id,
tweet_type=tweet_type,
count=count
)
return convert_tweets_to_markdown(tweets)
except Exception as e:
logger.error(f"Failed to get user tweets: {e}")
return f"Failed to get user tweets: {e}"
@mcp.tool()
async def get_timeline(count: int = 20) -> str:
"""Get tweets from your home timeline (For You)."""
try:
client = await get_twitter_client()
tweets = await client.get_timeline(count=count)
return convert_tweets_to_markdown(tweets)
except Exception as e:
logger.error(f"Failed to get timeline: {e}")
return f"Failed to get timeline: {e}"
@mcp.tool()
async def get_latest_timeline(count: int = 20) -> str:
"""Get tweets from your home timeline (Following)."""
try:
client = await get_twitter_client()
tweets = await client.get_latest_timeline(count=count)
return convert_tweets_to_markdown(tweets)
except Exception as e:
logger.error(f"Failed to get latest timeline: {e}")
return f"Failed to get latest timeline: {e}"
# New write tools
@mcp.tool()
async def post_tweet(
text: str,
media_paths: Optional[List[str]] = None,
reply_to: Optional[str] = None,
tags: Optional[List[str]] = None
) -> str:
"""Post a tweet with optional media, reply, and tags."""
try:
if not check_rate_limit('tweet'):
return "Rate limit exceeded for tweets. Please wait before posting again."
client = await get_twitter_client()
# Handle tags by converting to mentions
if tags:
mentions = ' '.join(f"@{tag.lstrip('@')}" for tag in tags)
text = f"{text}
{mentions}"
# Upload media if provided
media_ids = []
if media_paths:
for path in media_paths:
media_id = await client.upload_media(path, wait_for_completion=True)
media_ids.append(media_id)
# Create the tweet
tweet = await client.create_tweet(
text=text,
media_ids=media_ids if media_ids else None,
reply_to=reply_to
)
RATE_LIMITS.setdefault('tweet', []).append(time.time())
return f"Successfully posted tweet: {tweet.id}"
except Exception as e:
logger.error(f"Failed to post tweet: {e}")
return f"Failed to post tweet: {e}"
@mcp.tool()
async def delete_tweet(tweet_id: str) -> str:
"""Delete a tweet by its ID."""
try:
client = await get_twitter_client()
await client.delete_tweet(tweet_id)
return f"Successfully deleted tweet {tweet_id}"
except Exception as e:
logger.error(f"Failed to delete tweet: {e}")
return f"Failed to delete tweet: {e}"
@mcp.tool()
async def send_dm(user_id: str, message: str, media_path: Optional[str] = None) -> str:
"""Send a direct message to a user."""
try:
if not check_rate_limit('dm'):
return "Rate limit exceeded for DMs. Please wait before sending again."
client = await get_twitter_client()
media_id = None
if media_path:
media_id = await client.upload_media(media_path, wait_for_completion=True)
await client.send_dm(
user_id=user_id,
text=message,
media_id=media_id
)
RATE_LIMITS.setdefault('dm', []).append(time.time())
return f"Successfully sent DM to user {user_id}"
except Exception as e:
logger.error(f"Failed to send DM: {e}")
return f"Failed to send DM: {e}"
@mcp.tool()
async def delete_dm(message_id: str) -> str:
"""Delete a direct message by its ID."""
try:
client = await get_twitter_client()
await client.delete_dm(message_id)
return f"Successfully deleted DM {message_id}"
except Exception as e:
logger.error(f"Failed to delete DM: {e}")
return f"Failed to delete DM: {e}"
def convert_tweets_to_markdown(tweets) -> str:
"""Convert a list of tweets to markdown format."""
result = []
for tweet in tweets:
result.append(f"### @{tweet.user.screen_name}")
result.append(f"**{tweet.created_at}**")
result.append(tweet.text)
if tweet.media:
for media in tweet.media:
result.append(f"")
result.append("---")
return "\n".join(result)