from typing import Any, Dict
import os
import httpx
import requests
from requests_oauthlib import OAuth1
from mcp.server.fastmcp import FastMCP
# ----------------------------------
# MCP Server
# ----------------------------------
mcp = FastMCP("twitter")
# ----------------------------------
# Twitter API Configuration
# ----------------------------------
TWITTER_API_BASE = "https://api.twitter.com/2"
# App-only bearer token (READ-ONLY)
TWITTER_APP_BEARER_TOKEN = os.getenv("TWITTER_BEARER_TOKEN")
# OAuth 1.0a credentials (WRITE)
TWITTER_API_KEY = os.getenv("TWITTER_API_KEY")
TWITTER_API_SECRET = os.getenv("TWITTER_API_SECRET_KEY")
TWITTER_ACCESS_TOKEN = os.getenv("TWITTER_ACCESS_TOKEN")
TWITTER_ACCESS_TOKEN_SECRET = os.getenv("TWITTER_SECRET_ACCESS_TOKEN")
# ----------------------------------
# Helpers
# ----------------------------------
def _get_app_headers() -> Dict[str, str]:
"""
Used for read-only operations (search, read tweets).
"""
if not TWITTER_APP_BEARER_TOKEN:
return {}
return {
"Authorization": f"Bearer {TWITTER_APP_BEARER_TOKEN}",
"Content-Type": "application/json",
}
def _get_oauth1() -> OAuth1 | None:
"""
Used for write operations (create tweet).
"""
if not all([
TWITTER_API_KEY,
TWITTER_API_SECRET,
TWITTER_ACCESS_TOKEN,
TWITTER_ACCESS_TOKEN_SECRET,
]):
return None
return OAuth1(
TWITTER_API_KEY,
TWITTER_API_SECRET,
TWITTER_ACCESS_TOKEN,
TWITTER_ACCESS_TOKEN_SECRET,
)
# ----------------------------------
# Tools
# ----------------------------------
@mcp.tool(
name="search_tweets",
description="Search recent public tweets (read-only)"
)
async def search_tweets(
query: str,
max_results: int = 10
) -> Dict[str, Any]:
headers = _get_app_headers()
if not headers:
return {
"error": "TWITTER_BEARER_TOKEN not configured (app-only token required)."
}
url = f"{TWITTER_API_BASE}/tweets/search/recent"
params = {
"query": query,
"max_results": min(max_results, 100),
"tweet.fields": "author_id,created_at,public_metrics",
}
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
tweets = [
{
"id": t["id"],
"text": t["text"],
"author_id": t["author_id"],
"created_at": t["created_at"],
"metrics": t.get("public_metrics", {}),
}
for t in data.get("data", [])
]
return {
"query": query,
"count": len(tweets),
"tweets": tweets,
}
@mcp.tool(
name="create_tweet",
description="Create a new tweet using OAuth 1.0a (Access Token + Secret)"
)
def create_tweet(text: str) -> Dict[str, Any]:
if len(text) > 280:
return {"error": "Tweet exceeds 280 characters."}
oauth = _get_oauth1()
if not oauth:
return {
"error": (
"OAuth 1.0a credentials not configured.\n"
"Set TWITTER_API_KEY, TWITTER_API_SECRET, "
"TWITTER_ACCESS_TOKEN, and TWITTER_ACCESS_TOKEN_SECRET."
)
}
url = f"{TWITTER_API_BASE}/tweets"
payload = {"text": text}
response = requests.post(url, auth=oauth, json=payload)
if response.status_code == 403:
return {
"error": "Permission denied. Ensure Read/Write access is enabled."
}
response.raise_for_status()
data = response.json()
return {
"status": "posted",
"tweet_id": data["data"]["id"],
"text": data["data"]["text"],
}
# ----------------------------------
# Server Entry Point
# ----------------------------------
def main():
mcp.run(transport="stdio")
if __name__ == "__main__":
main()