"""Format X API responses as clean text for MCP tool output."""
from __future__ import annotations
from datetime import datetime
def _ts(iso: str) -> str:
"""Format ISO timestamp compactly."""
try:
dt = datetime.fromisoformat(iso.replace("Z", "+00:00"))
return dt.strftime("[%Y-%m-%d %H:%M]")
except (ValueError, AttributeError):
return "[unknown]"
def _metrics(m: dict | None) -> str:
"""Format public metrics inline."""
if not m:
return ""
parts = []
if m.get("reply_count"):
parts.append(f"{m['reply_count']} replies")
if m.get("retweet_count"):
parts.append(f"{m['retweet_count']} RTs")
if m.get("like_count"):
parts.append(f"{m['like_count']} likes")
if m.get("quote_count"):
parts.append(f"{m['quote_count']} quotes")
if m.get("impression_count"):
parts.append(f"{m['impression_count']} views")
return f" ({', '.join(parts)})" if parts else ""
def _resolve_author(tweet: dict, includes: dict | None) -> str:
"""Resolve author_id to @username from includes."""
author_id = tweet.get("author_id")
if includes and "users" in includes:
for u in includes["users"]:
if u.get("id") == author_id:
return f"@{u['username']}"
return f"user:{author_id}" if author_id else "@unknown"
def format_tweet(tweet: dict, includes: dict | None = None) -> str:
"""Format a single tweet as one-line text."""
ts = _ts(tweet.get("created_at", ""))
author = _resolve_author(tweet, includes)
text = tweet.get("text", "").replace("\n", " ").strip()
if len(text) > 500:
text = text[:500] + "..."
metrics = _metrics(tweet.get("public_metrics"))
tid = tweet.get("id", "?")
# Detect retweet / quote
refs = tweet.get("referenced_tweets", [])
prefix = ""
for ref in refs:
if ref.get("type") == "retweeted":
prefix = "RT "
elif ref.get("type") == "quoted":
prefix = "QT "
elif ref.get("type") == "replied_to":
prefix = f"reply to {ref.get('id', '?')} "
return f"{ts} {author}: {prefix}{text}{metrics} (ID: {tid})"
def format_tweet_list(data: dict) -> str:
"""Format a list of tweets from X API response."""
tweets = data.get("data", [])
if not tweets:
return "No tweets found."
includes = data.get("includes")
lines = [format_tweet(t, includes) for t in tweets]
result = "\n".join(lines)
meta = data.get("meta", {})
count = meta.get("result_count", len(tweets))
next_token = meta.get("next_token")
footer = f"\n---\n{count} tweets shown."
if next_token:
footer += f" Next page: {next_token}"
return result + footer
def format_user(user: dict) -> str:
"""Format user profile."""
name = user.get("name", "Unknown")
username = user.get("username", "?")
desc = user.get("description", "").replace("\n", " ")
if len(desc) > 200:
desc = desc[:200] + "..."
m = user.get("public_metrics", {})
lines = [
f"{name} (@{username})",
f"ID: {user.get('id', '?')}",
]
if desc:
lines.append(f"Bio: {desc}")
if m:
lines.append(
f"Followers: {m.get('followers_count', 0)} | "
f"Following: {m.get('following_count', 0)} | "
f"Tweets: {m.get('tweet_count', 0)}"
)
if user.get("created_at"):
lines.append(f"Joined: {_ts(user['created_at'])}")
if user.get("verified"):
lines.append("Verified: Yes")
return "\n".join(lines)
def format_user_list(data: dict) -> str:
"""Format a list of users."""
users = data.get("data", [])
if not users:
return "No users found."
lines = []
for i, u in enumerate(users, 1):
m = u.get("public_metrics", {})
followers = m.get("followers_count", 0)
lines.append(
f"{i}. {u.get('name', '?')} (@{u.get('username', '?')}) "
f"— {followers} followers (ID: {u.get('id', '?')})"
)
return "\n".join(lines)
def format_dm_events(data: dict) -> str:
"""Format DM events."""
events = data.get("data", [])
if not events:
return "No DM messages found."
lines = []
for e in reversed(events): # chronological
ts = _ts(e.get("created_at", ""))
sender = e.get("sender_id", "?")
text = e.get("text", "").replace("\n", " ")
conv = e.get("dm_conversation_id", "?")
lines.append(f"{ts} user:{sender} [conv:{conv}]: {text}")
return "\n".join(lines)
def format_internal_conversations(
conversations: dict, users: dict, entries: list, limit: int = 20
) -> str:
"""Format conversations from v1.1 inbox_initial_state response."""
if not conversations:
return "No DM conversations found."
# Build user map
user_map: dict[str, str] = {}
for uid, udata in users.items():
name = udata.get("name", "")
screen = udata.get("screen_name", "")
user_map[uid] = f"{name} (@{screen})" if screen else f"user:{uid}"
# Sort by last message time (most recent first)
sorted_convos = sorted(
conversations.values(),
key=lambda c: c.get("sort_timestamp", "0"),
reverse=True,
)[:limit]
lines = [f"DM Conversations ({len(sorted_convos)} shown):\n"]
for conv in sorted_convos:
cid = conv.get("conversation_id", "?")
conv_type = conv.get("type", "ONE_TO_ONE")
encrypted = conv.get("dm_secret_conversations_enabled", False)
# Participants
participants = conv.get("participants", [])
pnames = []
for p in participants:
uid = p.get("user_id", "")
pnames.append(user_map.get(uid, f"user:{uid}"))
# Last message from entries
last_ts = ""
sort_ts = conv.get("sort_timestamp")
if sort_ts:
try:
from datetime import datetime
ts_sec = int(sort_ts) / 1000 if len(sort_ts) > 10 else int(sort_ts)
dt = datetime.fromtimestamp(ts_sec)
last_ts = dt.strftime("[%Y-%m-%d %H:%M]")
except (ValueError, OSError):
last_ts = "[?]"
enc_badge = " [ENCRYPTED]" if encrypted else ""
type_badge = " [GROUP]" if conv_type == "GROUP_DM" else ""
lines.append(
f"{last_ts} {', '.join(pnames)}{type_badge}{enc_badge}\n"
f" conv_id: {cid}"
)
return "\n\n".join(lines)
def format_internal_messages(
messages: list[dict], users: dict, conversation_id: str
) -> str:
"""Format messages from v1.1 conversation response."""
if not messages:
return f"No messages in conversation {conversation_id}."
# Build user map
user_map: dict[str, str] = {}
for uid, udata in users.items():
screen = udata.get("screen_name", "")
user_map[uid] = f"@{screen}" if screen else f"user:{uid}"
# Sort chronologically
sorted_msgs = sorted(
messages,
key=lambda m: m.get("message_data", {}).get("time", "0"),
)
lines = [f"Conversation {conversation_id} ({len(sorted_msgs)} messages):\n"]
for msg in sorted_msgs:
msg_data = msg.get("message_data", {})
sender_id = msg_data.get("sender_id", "?")
sender = user_map.get(sender_id, f"user:{sender_id}")
text = msg_data.get("text", "")
time_ms = msg_data.get("time", "")
ts_fmt = ""
if time_ms:
try:
from datetime import datetime
dt = datetime.fromtimestamp(int(time_ms) / 1000)
ts_fmt = dt.strftime("[%Y-%m-%d %H:%M]")
except (ValueError, OSError):
ts_fmt = "[?]"
# Check for encrypted content
encrypted = msg_data.get("dm_secret_conversations_enabled", False)
if not text and encrypted:
text = "[E2E encrypted message — decryption key needed]"
elif not text:
# Check for attachment
attachment = msg_data.get("attachment", {})
if attachment:
text = f"[attachment: {attachment.get('type', 'unknown')}]"
else:
text = "[empty message]"
# Truncate long messages
if len(text) > 500:
text = text[:500] + "..."
text = text.replace("\n", " ")
lines.append(f"{ts_fmt} {sender}: {text}")
return "\n".join(lines)
def format_xchat_conversations(items: list, limit: int = 20) -> str:
"""Format XChat conversations from GetInitialXChatPageQuery response."""
if not items:
return "No encrypted conversations found."
lines = []
for item in items[:limit]:
detail = item.get("conversation_detail", {})
cid = detail.get("conversation_id", "?")
participants = detail.get("participants_results", [])
names = []
for p in participants:
result = p.get("result", {})
core = result.get("core", {})
name = core.get("name", "")
screen = core.get("screen_name", "")
if screen:
names.append(f"{name} (@{screen})")
else:
names.append(f"user:{result.get('rest_id', '?')}")
msgs = item.get("latest_message_events", [])
lines.append(
f"[ENCRYPTED] {', '.join(names)} ({len(msgs)} msgs)\n"
f" conv_id: {cid}"
)
return "\n\n".join(lines)
def format_xchat_messages(
messages: list[dict],
participants: list[dict],
conversation_id: str,
has_key: bool = False,
limit: int = 50,
) -> str:
"""Format decoded XChat messages (from Thrift) for display."""
# Build participant map
user_map: dict[str, str] = {}
for p in participants:
result = p.get("result", {})
uid = result.get("rest_id", "")
core = result.get("core", {})
screen = core.get("screen_name", "")
user_map[uid] = f"@{screen}" if screen else f"user:{uid}"
if not messages:
return f"No messages in conversation {conversation_id}."
# Sort by timestamp
sorted_msgs = sorted(
messages,
key=lambda m: m.get("timestamp_ms", "0"),
)[:limit]
header = f"Conversation {conversation_id} ({len(sorted_msgs)} messages)"
if has_key:
header += " [DECRYPTED]"
else:
header += " [ENCRYPTED - no private key]"
lines = [header + ":\n"]
for msg in sorted_msgs:
sender_id = msg.get("sender_id", "?")
sender = user_map.get(sender_id, f"user:{sender_id}")
ts_ms = msg.get("timestamp_ms", "")
ts_fmt = ""
if ts_ms:
try:
from datetime import datetime
dt = datetime.fromtimestamp(int(ts_ms) / 1000)
ts_fmt = dt.strftime("[%Y-%m-%d %H:%M]")
except (ValueError, OSError):
ts_fmt = "[?]"
text = msg.get("decrypted_text", "")
if not text:
enc = msg.get("encrypted_text", "")
if enc:
text = "[encrypted message]"
else:
text = "[empty]"
if len(text) > 500:
text = text[:500] + "..."
text = text.replace("\n", " ")
lines.append(f"{ts_fmt} {sender}: {text}")
return "\n".join(lines)
def format_grok_response(data: dict) -> str:
"""Format Grok Responses API output (x_search results)."""
# The Responses API returns output items
output_items = data.get("output", [])
text_parts = []
for item in output_items:
if item.get("type") == "message":
content = item.get("content", [])
for block in content:
if block.get("type") == "output_text":
text_parts.append(block.get("text", ""))
elif block.get("type") == "refusal":
text_parts.append(f"[Refused: {block.get('refusal', '')}]")
if not text_parts:
# Fallback: try to find any text in the response
return str(data)
return "\n\n".join(text_parts)