from typing import Any, Dict, List, Optional
import os
import httpx
import requests
from requests_oauthlib import OAuth1
from mcp.server.fastmcp import FastMCP
# ----------------------------------
# MCP Server
# ----------------------------------
mcp = FastMCP("twitter")
# ----------------------------------
# Twitter API Configuration
# ----------------------------------
TWITTER_API_BASE = "https://api.twitter.com/2"
TWITTER_MEDIA_UPLOAD_URL = "https://upload.twitter.com/1.1/media/upload.json"
# App-only bearer token (READ-ONLY)
TWITTER_APP_BEARER_TOKEN = os.getenv("TWITTER_BEARER_TOKEN")
# OAuth 1.0a credentials (WRITE)
TWITTER_API_KEY = os.getenv("TWITTER_API_KEY")
TWITTER_API_SECRET = os.getenv("TWITTER_API_SECRET_KEY")
TWITTER_ACCESS_TOKEN = os.getenv("TWITTER_ACCESS_TOKEN")
TWITTER_ACCESS_TOKEN_SECRET = os.getenv("TWITTER_SECRET_ACCESS_TOKEN")
# ----------------------------------
# Helpers
# ----------------------------------
def _get_app_headers() -> Dict[str, str]:
if not TWITTER_APP_BEARER_TOKEN:
return {}
return {
"Authorization": f"Bearer {TWITTER_APP_BEARER_TOKEN}",
"Content-Type": "application/json",
}
def _get_oauth1() -> Optional[OAuth1]:
if not all([
TWITTER_API_KEY,
TWITTER_API_SECRET,
TWITTER_ACCESS_TOKEN,
TWITTER_ACCESS_TOKEN_SECRET,
]):
return None
return OAuth1(
TWITTER_API_KEY,
TWITTER_API_SECRET,
TWITTER_ACCESS_TOKEN,
TWITTER_ACCESS_TOKEN_SECRET,
)
def _upload_image(image_path: str) -> str:
oauth = _get_oauth1()
if not oauth:
raise RuntimeError("OAuth credentials not configured")
if not os.path.exists(image_path):
raise FileNotFoundError(f"Image not found: {image_path}")
with open(image_path, "rb") as f:
files = {"media": f}
response = requests.post(
TWITTER_MEDIA_UPLOAD_URL,
auth=oauth,
files=files,
timeout=30,
)
response.raise_for_status()
return response.json()["media_id_string"]
# ----------------------------------
# Tools
# ----------------------------------
@mcp.tool(
name="search_tweets",
description="Search recent public tweets (read-only)"
)
async def search_tweets(
query: str,
max_results: int = 10
) -> Dict[str, Any]:
headers = _get_app_headers()
if not headers:
return {"error": "TWITTER_BEARER_TOKEN not configured"}
url = f"{TWITTER_API_BASE}/tweets/search/recent"
params = {
"query": query,
"max_results": min(max_results, 100),
"tweet.fields": "author_id,created_at,public_metrics",
}
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
tweets = [
{
"id": t["id"],
"text": t["text"],
"author_id": t["author_id"],
"created_at": t["created_at"],
"metrics": t.get("public_metrics", {}),
}
for t in data.get("data", [])
]
return {
"query": query,
"count": len(tweets),
"tweets": tweets,
}
@mcp.tool(
name="upload_media",
description="Upload an image to Twitter and return media_id"
)
def upload_media(image_path: str) -> Dict[str, Any]:
try:
media_id = _upload_image(image_path)
return {"media_id": media_id}
except Exception as e:
return {"error": str(e)}
@mcp.tool(
name="create_tweet",
description="Create a tweet (supports optional images)"
)
def create_tweet(
text: str,
media_ids: Optional[List[str]] = None
) -> Dict[str, Any]:
if len(text) > 280:
return {"error": "Tweet exceeds 280 characters"}
oauth = _get_oauth1()
if not oauth:
return {"error": "OAuth credentials not configured"}
payload: Dict[str, Any] = {"text": text}
if media_ids:
payload["media"] = {"media_ids": media_ids}
response = requests.post(
f"{TWITTER_API_BASE}/tweets",
auth=oauth,
json=payload,
timeout=30,
)
if response.status_code == 403:
return {"error": "Permission denied (check Read/Write access)"}
response.raise_for_status()
data = response.json()
return {
"status": "posted",
"tweet_id": data["data"]["id"],
"text": data["data"]["text"],
"media_ids": media_ids or [],
}
@mcp.tool(
name="summarize_tweets",
description="Summarize tweets"
)
def summarize_tweets(
tweets: List[Dict[str, Any]],
style: str = "neutral"
) -> Dict[str, Any]:
if not tweets:
return {"summary": "No tweets provided"}
instructions = {
"neutral": "Summarize the discussion briefly",
"technical": "Focus on technical concepts",
"executive": "Focus on business insights",
}
return {
"style": style,
"instruction": instructions.get(style, instructions["neutral"]),
"tweet_count": len(tweets),
"tweets": [t["text"] for t in tweets],
}
@mcp.tool(
name="analyze_engagement",
description="Analyze tweet engagement"
)
def analyze_engagement(
tweets: List[Dict[str, Any]]
) -> Dict[str, Any]:
if not tweets:
return {"error": "No tweets provided"}
metrics = []
for t in tweets:
m = t.get("metrics", {})
metrics.append({
"id": t["id"],
"text": t["text"],
"likes": m.get("like_count", 0),
"retweets": m.get("retweet_count", 0),
"replies": m.get("reply_count", 0),
})
top = max(
metrics,
key=lambda x: x["likes"] + x["retweets"] + x["replies"],
default=None
)
return {
"tweet_count": len(metrics),
"top_tweet": top,
}
@mcp.tool(
name="extract_topics",
description="Extract hashtags and keywords"
)
def extract_topics(
tweets: List[Dict[str, Any]]
) -> Dict[str, Any]:
hashtags = {}
keywords = {}
for t in tweets:
for word in t.get("text", "").lower().split():
if word.startswith("#"):
hashtags[word] = hashtags.get(word, 0) + 1
elif len(word) > 4:
keywords[word] = keywords.get(word, 0) + 1
return {
"top_hashtags": sorted(hashtags, key=hashtags.get, reverse=True)[:5],
"top_keywords": sorted(keywords, key=keywords.get, reverse=True)[:10],
}
# ----------------------------------
# Server Entry Point
# ----------------------------------
def main():
mcp.run(transport="stdio")
if __name__ == "__main__":
main()