#!/usr/bin/env python3
"""
Engagio MCP Server
Twitter engagement intelligence - read via twitterapi.io, write via Twitter API
Persistent storage via Supabase
"""
import os
import json
import math
import time
from datetime import datetime, timedelta
from typing import Any
import requests
from requests_oauthlib import OAuth1
from dotenv import load_dotenv
from supabase import create_client, Client
from mcp.server.fastmcp import FastMCP
# Load environment variables
load_dotenv()
# Initialize MCP server
mcp = FastMCP("engagio")
# API Credentials
TWITTERAPI_IO_TOKEN: str = os.getenv("TWITTERAPI_IO_TOKEN", "")
TWITTER_API_KEY: str = os.getenv("TWITTER_API_KEY", "")
TWITTER_API_KEY_SECRET: str = os.getenv("TWITTER_API_KEY_SECRET", "")
TWITTER_ACCESS_TOKEN: str = os.getenv("TWITTER_ACCESS_TOKEN", "")
TWITTER_ACCESS_TOKEN_SECRET: str = os.getenv("TWITTER_ACCESS_TOKEN_SECRET", "")
SUPABASE_URL: str = os.getenv("SUPABASE_URL", "")
SUPABASE_KEY: str = os.getenv("SUPABASE_KEY", "")
# Twitter OAuth1 for posting
twitter_auth = OAuth1(
TWITTER_API_KEY,
TWITTER_API_KEY_SECRET,
TWITTER_ACCESS_TOKEN,
TWITTER_ACCESS_TOKEN_SECRET
)
# Supabase client
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
# Global rate limiter
_last_api_call: float = 0
_min_delay: float = 3.0 # Minimum seconds between API calls
_rate_limit_backoff: float = 10.0 # Initial backoff on rate limit
# Credit costs per endpoint (based on twitterapi.io pricing)
ENDPOINT_CREDITS: dict[str, int] = {
"user/info": 18, # Single user lookup
"user/batch_info_by_ids": 10, # Batch user (10 per user for 100+, we use 10 as estimate)
"user/last_tweets": 15, # User timeline
"tweets": 15, # Get tweets by IDs
"tweet/advanced_search": 15, # Search tweets
}
def _log_api_usage(endpoint: str, credits: int, accounts_count: int | None = None, tweets_count: int | None = None) -> None:
"""Log API usage to database for cost tracking"""
try:
supabase.table("api_usage").insert({
"endpoint": endpoint,
"credits_used": credits,
"accounts_count": accounts_count,
"tweets_count": tweets_count
}).execute()
except Exception:
pass # Don't fail main operation if logging fails
# ============ twitterapi.io Client ============
def twitterapi_get(endpoint: str, params: dict[str, Any] | None = None, max_retries: int = 3) -> dict[str, Any]:
"""Make a GET request to twitterapi.io with automatic rate limiting, retry, and cost tracking"""
global _last_api_call, _rate_limit_backoff
url = f"https://api.twitterapi.io/twitter/{endpoint}"
headers = {"X-API-Key": TWITTERAPI_IO_TOKEN}
for attempt in range(max_retries):
# Enforce minimum delay between calls
elapsed = time.time() - _last_api_call
if elapsed < _min_delay:
time.sleep(_min_delay - elapsed)
_last_api_call = time.time()
resp = requests.get(url, headers=headers, params=params)
if resp.status_code == 429:
if attempt < max_retries - 1:
# Exponential backoff
wait_time = _rate_limit_backoff * (2 ** attempt)
time.sleep(wait_time)
continue
return {"status": "error", "msg": "Rate limited. Please wait a moment and try again."}
if resp.status_code != 200:
return {"status": "error", "msg": f"API error: {resp.status_code}"}
# Reset backoff on success
_rate_limit_backoff = 10.0
result = resp.json()
# Log API usage for cost tracking
if result.get("status") != "error":
base_credits = ENDPOINT_CREDITS.get(endpoint, 15)
accounts_count = None
tweets_count = None
# Calculate credits based on endpoint and response
if endpoint == "user/batch_info_by_ids":
users = result.get("users", [])
accounts_count = len(users)
# Batch pricing: 10 credits/user for 100+, else 18/user
credits = accounts_count * (10 if accounts_count >= 100 else 18)
elif endpoint == "user/info":
accounts_count = 1
credits = base_credits
elif endpoint == "user/last_tweets":
tweets = result.get("data", {}).get("tweets", [])
tweets_count = len(tweets)
credits = base_credits
elif endpoint == "tweets":
tweets = result.get("tweets", [])
tweets_count = len(tweets)
credits = base_credits
elif endpoint == "tweet/advanced_search":
tweets = result.get("tweets", [])
tweets_count = len(tweets)
credits = base_credits
else:
credits = base_credits
_log_api_usage(endpoint, credits, accounts_count, tweets_count)
return result
return {"status": "error", "msg": "Max retries exceeded"}
def twitterapi_get_user(username: str) -> dict[str, Any]:
"""Get user info from twitterapi.io"""
return twitterapi_get("user/info", {"userName": username})
def twitterapi_get_users_batch(user_ids: list[str]) -> dict[str, Any]:
"""
Batch get user info from twitterapi.io.
Much more cost-effective than individual calls:
- Single: 18 credits/user
- Batch 100+: 10 credits/user (44% cheaper)
Returns: {"users": [...], "status": "success"} or {"status": "error", "msg": "..."}
"""
if not user_ids:
return {"users": [], "status": "success"}
# Pass user IDs as comma-separated string
ids_str = ",".join(user_ids)
return twitterapi_get("user/batch_info_by_ids", {"userIds": ids_str})
def twitterapi_get_user_tweets(user_id: str, cursor: str | None = None) -> dict[str, Any]:
"""Get tweets from a user via twitterapi.io"""
params: dict[str, Any] = {"userId": user_id}
if cursor:
params["cursor"] = cursor
return twitterapi_get("user/last_tweets", params)
def twitterapi_get_tweets_by_ids(tweet_ids: list[str]) -> dict[str, Any]:
"""Get tweet details by IDs from twitterapi.io"""
if not tweet_ids:
return {"tweets": []}
ids_str = ",".join(tweet_ids)
return twitterapi_get("tweets", {"tweet_ids": ids_str})
# ============ Twitter Official API Client ============
def twitter_post_tweet(text: str, reply_to_id: str | None = None) -> dict[str, Any]:
"""Post a tweet using Twitter's official API"""
url = "https://api.twitter.com/2/tweets"
payload: dict[str, Any] = {"text": text}
if reply_to_id:
payload["reply"] = {"in_reply_to_tweet_id": reply_to_id}
resp = requests.post(url, auth=twitter_auth, json=payload)
return {"status": resp.status_code, "data": resp.json()}
# ============ Scoring Algorithm ============
def calculate_engagement_score(tweet: dict[str, Any], author_followers: int = 0) -> float:
"""
Calculate engagement score based on velocity and author reach.
Score = (likes + retweets*2 + replies*3) / sqrt(minutes_old + 1) * follower_multiplier
Follower multiplier uses log scale to prevent huge accounts from dominating.
"""
likes = tweet.get("likeCount") or tweet.get("likes") or 0
retweets = tweet.get("retweetCount") or tweet.get("retweets") or 0
replies = tweet.get("replyCount") or tweet.get("replies") or 0
posted_at_str = tweet.get("createdAt", "")
try:
posted_at = datetime.strptime(posted_at_str, "%a %b %d %H:%M:%S %z %Y")
minutes_old = (datetime.now(posted_at.tzinfo) - posted_at).total_seconds() / 60
except (ValueError, TypeError):
minutes_old = 60
engagement = likes + (retweets * 2) + (replies * 3)
base_score = engagement / math.sqrt(minutes_old + 1)
# Apply follower multiplier (log scale: 1k=1.3x, 10k=1.4x, 100k=1.5x, 1M=1.6x)
follower_multiplier = 1 + math.log10(max(author_followers, 1000)) / 10
score = base_score * follower_multiplier
normalized = min(100, score * 2)
return round(normalized, 1)
def parse_twitter_timestamp(ts: str) -> str | None:
"""Convert Twitter timestamp to ISO format for Supabase"""
try:
dt = datetime.strptime(ts, "%a %b %d %H:%M:%S %z %Y")
return dt.isoformat()
except (ValueError, TypeError):
return None
# ============ MCP Tools ============
@mcp.tool()
def engagio_add_account(username: str) -> str:
"""Add a Twitter account to monitor for engagement opportunities"""
username = username.lower().replace("@", "")
# Check if already exists
existing = supabase.table("monitored_accounts").select("*").eq("username", username).execute()
if existing.data:
return f"@{username} is already being monitored"
# Get user info from twitterapi.io
result = twitterapi_get_user(username)
if result.get("status") != "success":
return f"Error: Could not find user @{username}. {result.get('msg', '')}"
user_data = result.get("data", {})
bio = user_data.get("description", "")
# Insert into Supabase
supabase.table("monitored_accounts").insert({
"username": username,
"user_id": user_data.get("id"),
"name": user_data.get("name"),
"followers": user_data.get("followers", 0),
"bio": bio
}).execute()
return f"Added @{username} (ID: {user_data.get('id')}, {user_data.get('followers', 0):,} followers) to monitored accounts\nBio: {bio[:100]}{'...' if len(bio) > 100 else ''}"
@mcp.tool()
def engagio_add_accounts_bulk(usernames: list[str]) -> str:
"""
Add multiple Twitter accounts with rate limit handling and retry logic.
Note: Uses individual API calls (batch requires user_ids which we don't have yet).
For refreshing existing accounts, use engagio_refresh_accounts() which uses batch API.
"""
results = []
backoff_times = [5, 15, 30, 60, 120] # Seconds to wait on rate limit
for username in usernames:
username = username.lower().replace("@", "")
# Check if already exists
existing = supabase.table("monitored_accounts").select("*").eq("username", username).execute()
if existing.data:
results.append(f"@{username}: already monitored")
continue
# Retry with exponential backoff
success = False
result = {}
for attempt in range(len(backoff_times)):
result = twitterapi_get_user(username)
if result.get("status") == "success":
user_data = result.get("data", {})
bio = user_data.get("description", "")
supabase.table("monitored_accounts").insert({
"username": username,
"user_id": user_data.get("id"),
"name": user_data.get("name"),
"followers": user_data.get("followers", 0),
"bio": bio
}).execute()
results.append(f"@{username}: ✓ ({user_data.get('followers', 0):,} followers)")
success = True
break
elif "rate limit" in result.get("msg", "").lower():
wait_time = backoff_times[attempt]
time.sleep(wait_time)
else:
results.append(f"@{username}: ✗ {result.get('msg', 'not found')}")
break
if not success and "rate limit" in result.get("msg", "").lower():
results.append(f"@{username}: ✗ rate limited after {len(backoff_times)} attempts")
time.sleep(2) # 2s delay between accounts to avoid rate limits
added = sum(1 for r in results if "✓" in r)
return f"Bulk add complete: {added}/{len(usernames)} added\n\n" + "\n".join(results)
@mcp.tool()
def engagio_remove_account(username: str) -> str:
"""Remove a Twitter account from monitoring"""
username = username.lower().replace("@", "")
result = supabase.table("monitored_accounts").delete().eq("username", username).execute()
if result.data:
return f"Removed @{username} from monitored accounts"
return f"@{username} was not being monitored"
@mcp.tool()
def engagio_remove_accounts_bulk(usernames: list[str]) -> str:
"""Remove multiple Twitter accounts from monitoring"""
results = []
for username in usernames:
username = username.lower().replace("@", "")
result = supabase.table("monitored_accounts").delete().eq("username", username).execute()
if result.data:
results.append(f"@{username}: ✓ removed")
else:
results.append(f"@{username}: ✗ not found")
removed = sum(1 for r in results if "✓" in r)
return f"Bulk remove complete: {removed}/{len(usernames)} removed\n\n" + "\n".join(results)
@mcp.tool()
def engagio_list_accounts() -> str:
"""List all monitored Twitter accounts"""
result = supabase.table("monitored_accounts").select("*").order("followers", desc=True).execute()
if not result.data:
return "No accounts being monitored. Use engagio_add_account to add some."
lines = ["Monitored Accounts:", ""]
for account in result.data:
bio = account.get('bio', '')
bio_preview = f"\n Bio: {bio[:80]}..." if bio and len(bio) > 80 else (f"\n Bio: {bio}" if bio else "")
notes = account.get('notes', '')
notes_preview = f"\n Notes: {notes[:80]}..." if notes and len(notes) > 80 else (f"\n Notes: {notes}" if notes else "")
lines.append(f"@{account['username']} - {account.get('followers', 0):,} followers{bio_preview}{notes_preview}")
lines.append("")
return "\n".join(lines)
@mcp.tool()
def engagio_backfill_bios() -> str:
"""
Fetch and update bios for all monitored accounts that don't have one.
Uses batch API for efficiency (58 calls → 1 call, 98% savings).
"""
# Get accounts without bios
result = supabase.table("monitored_accounts").select("*").execute()
if not result.data:
return "No accounts to update."
accounts_to_update = [a for a in result.data if not a.get('bio')]
if not accounts_to_update:
return f"All {len(result.data)} accounts already have bios."
# Separate accounts with and without user_ids
accounts_with_ids = [a for a in accounts_to_update if a.get('user_id')]
accounts_without_ids = [a for a in accounts_to_update if not a.get('user_id')]
updated = 0
errors = []
# Batch fetch accounts that have user_ids (most efficient)
if accounts_with_ids:
user_ids = [a['user_id'] for a in accounts_with_ids]
batch_result = twitterapi_get_users_batch(user_ids)
if batch_result.get("status") != "error":
users = batch_result.get("users", [])
# Create lookup by user_id
user_map = {u.get("id"): u for u in users}
for account in accounts_with_ids:
user_data = user_map.get(account['user_id'])
if user_data:
supabase.table("monitored_accounts").update({
"bio": user_data.get("description", ""),
"followers": user_data.get("followers", account.get("followers", 0))
}).eq("username", account['username']).execute()
updated += 1
else:
errors.append(f"@{account['username']}: not found in batch response")
else:
errors.append(f"Batch API error: {batch_result.get('msg', 'unknown')}")
# Fallback to individual calls for accounts without user_ids
for account in accounts_without_ids:
username = account['username']
user_result = twitterapi_get_user(username)
if user_result.get("status") == "success":
user_data = user_result.get("data", {})
supabase.table("monitored_accounts").update({
"bio": user_data.get("description", ""),
"user_id": user_data.get("id"), # Also save the user_id for future batch calls
"followers": user_data.get("followers", account.get("followers", 0))
}).eq("username", username).execute()
updated += 1
else:
errors.append(f"@{username}: {user_result.get('msg', 'error')}")
result_msg = f"Updated bios for {updated}/{len(accounts_to_update)} accounts"
if accounts_with_ids:
result_msg += f" (batch: {len(accounts_with_ids)}, individual: {len(accounts_without_ids)})"
if errors:
result_msg += f"\n\nErrors:\n" + "\n".join(errors)
return result_msg
@mcp.tool()
def engagio_refresh_accounts() -> str:
"""
Batch refresh all monitored account profiles (bio, followers, etc).
Uses batch API for maximum efficiency: 58 accounts = 1 API call (~$0.002).
Use this for:
- Daily profile refresh
- Updating follower counts
- Syncing bios after Twitter profile changes
"""
# Get all accounts with user_ids
result = supabase.table("monitored_accounts").select("*").execute()
if not result.data:
return "No accounts to refresh."
accounts_with_ids = [a for a in result.data if a.get('user_id')]
accounts_without_ids = [a for a in result.data if not a.get('user_id')]
if not accounts_with_ids:
return f"No accounts have user_ids stored. Run engagio_backfill_bios() first to populate user_ids."
# Batch fetch all accounts
user_ids = [a['user_id'] for a in accounts_with_ids]
batch_result = twitterapi_get_users_batch(user_ids)
if batch_result.get("status") == "error":
return f"Batch API error: {batch_result.get('msg', 'unknown')}"
users = batch_result.get("users", [])
user_map = {u.get("id"): u for u in users}
updated = 0
errors = []
for account in accounts_with_ids:
user_data = user_map.get(account['user_id'])
if user_data:
# Update all profile fields
supabase.table("monitored_accounts").update({
"bio": user_data.get("description", account.get("bio", "")),
"followers": user_data.get("followers", account.get("followers", 0)),
"name": user_data.get("name", account.get("name", ""))
}).eq("username", account['username']).execute()
updated += 1
else:
errors.append(f"@{account['username']}: not found in API response")
result_msg = f"Refreshed {updated}/{len(accounts_with_ids)} accounts in 1 API call"
if accounts_without_ids:
result_msg += f"\n\n{len(accounts_without_ids)} accounts skipped (no user_id). Run engagio_backfill_bios() to fix."
if errors:
result_msg += f"\n\nErrors:\n" + "\n".join(errors[:10]) # Limit error output
if len(errors) > 10:
result_msg += f"\n... and {len(errors) - 10} more"
return result_msg
@mcp.tool()
def engagio_update_account_notes(username: str, notes: str) -> str:
"""Update notes/accomplishments for a monitored account"""
username = username.lower().replace("@", "")
result = supabase.table("monitored_accounts").update({
"notes": notes
}).eq("username", username).execute()
if result.data:
return f"Updated notes for @{username}"
return f"@{username} not found in monitored accounts"
def _fetch_and_store_tweets(account: dict[str, Any]) -> int:
"""Internal helper to fetch tweets for one account and store them."""
author_followers = account.get("followers", 0)
result = twitterapi_get_user_tweets(account["user_id"])
if result.get("status") != "success":
return -1 # Error
tweets = result.get("data", {}).get("tweets", [])
count = 0
for tweet in tweets:
tweet_id = tweet.get("id")
if not tweet_id:
continue
text = tweet.get("text", "")
is_retweet = text.startswith("RT @")
in_reply_to = tweet.get("inReplyToId")
in_reply_to_user = tweet.get("inReplyToUserId")
is_thread = in_reply_to_user == account.get("user_id") if in_reply_to_user else False
score = calculate_engagement_score(tweet, author_followers)
supabase.table("tweets").upsert({
"id": tweet_id,
"author_username": account["username"],
"author_user_id": account.get("user_id"),
"text": text,
"likes": tweet.get("likeCount", 0),
"retweets": tweet.get("retweetCount", 0),
"replies": tweet.get("replyCount", 0),
"views": tweet.get("viewCount", 0),
"posted_at": parse_twitter_timestamp(tweet.get("createdAt", "")),
"engagement_score": score,
"is_retweet": is_retweet,
"is_thread": is_thread,
"thread_id": in_reply_to if is_thread else None,
"author_followers": author_followers
}).execute()
count += 1
# Update last_fetched timestamp
supabase.table("monitored_accounts").update({
"last_fetched": datetime.utcnow().isoformat()
}).eq("username", account["username"]).execute()
return count
@mcp.tool()
def engagio_fetch_tweets(
username: str | None = None,
hours: int = 6,
top_n: int | None = None,
skip_recent_hours: int = 2
) -> str:
"""
Fetch recent tweets from monitored accounts.
COST OPTIMIZATION:
- username: Fetch from single account only
- top_n: Only fetch from top N accounts by followers (e.g., top_n=10)
- skip_recent_hours: Skip accounts fetched within X hours (default: 2)
Examples:
- fetch_tweets(top_n=10) - Only top 10 accounts, ~$0.015
- fetch_tweets(username="levelsio") - Single account, ~$0.0015
- fetch_tweets() - All accounts (expensive!)
"""
if username:
username = username.lower().replace("@", "")
accounts = supabase.table("monitored_accounts").select("*").eq("username", username).execute()
if not accounts.data:
result = twitterapi_get_user(username)
if result.get("status") == "success":
accounts_to_fetch = [{
"username": username,
"user_id": result["data"]["id"],
"followers": result["data"].get("followers", 0)
}]
else:
return f"Error: @{username} not found"
else:
accounts_to_fetch = accounts.data
else:
# Get accounts, sorted by followers (highest first)
query = supabase.table("monitored_accounts").select("*").order("followers", desc=True)
if top_n:
query = query.limit(top_n)
accounts = query.execute()
accounts_to_fetch = accounts.data or []
# Filter out recently fetched accounts
if skip_recent_hours > 0:
cutoff = datetime.utcnow() - timedelta(hours=skip_recent_hours)
accounts_to_fetch = [
a for a in accounts_to_fetch
if not a.get("last_fetched") or
datetime.fromisoformat(a["last_fetched"].replace("Z", "+00:00")).replace(tzinfo=None) < cutoff
]
if not accounts_to_fetch:
return "No accounts to fetch (all recently fetched or none configured)."
total_fetched = 0
fetched_accounts = []
errors = []
for account in accounts_to_fetch:
count = _fetch_and_store_tweets(account)
if count >= 0:
total_fetched += count
fetched_accounts.append(f"@{account['username']}")
else:
errors.append(f"@{account['username']}")
result_msg = f"Fetched {total_fetched} tweets from {len(fetched_accounts)} account(s)"
if top_n:
result_msg += f" (top {top_n} by followers)"
if errors:
result_msg += f"\nErrors: {', '.join(errors)}"
return result_msg
@mcp.tool()
def engagio_fetch_top_roi(n: int = 10, skip_recent_hours: int = 2) -> str:
"""
Fetch tweets from top N accounts by YOUR engagement ROI.
ROI = likes received / replies made to that account.
This prioritizes accounts where your replies historically perform best,
not just accounts with the most followers.
"""
# Get ROI data from replies
cutoff_30d = (datetime.utcnow() - timedelta(days=30)).isoformat()
replies = supabase.table("replies")\
.select("original_author, likes_received")\
.gte("replied_at", cutoff_30d)\
.execute()
# Calculate ROI per account
account_roi: dict[str, dict[str, float]] = {}
for r in (replies.data or []):
author = r.get("original_author", "")
if not author:
continue
if author not in account_roi:
account_roi[author] = {"replies": 0, "likes": 0}
account_roi[author]["replies"] += 1
account_roi[author]["likes"] += r.get("likes_received", 0) or 0
# Calculate engagement rate
for stats in account_roi.values():
stats["rate"] = stats["likes"] / stats["replies"] if stats["replies"] > 0 else 0
# Sort by ROI and get top accounts
sorted_accounts = sorted(account_roi.items(), key=lambda x: x[1]["rate"], reverse=True)
top_usernames = [username for username, _ in sorted_accounts[:n * 2]] # Get extra in case some filtered
if not top_usernames:
return "No ROI data yet. Post some replies first, then run engagio_update_reply_performance()."
# Get account data for these usernames
accounts_result = supabase.table("monitored_accounts")\
.select("*")\
.in_("username", top_usernames)\
.execute()
if not accounts_result.data:
return "No monitored accounts match your top ROI accounts."
# Filter recently fetched and limit to n
accounts_to_fetch = []
if skip_recent_hours > 0:
cutoff = datetime.utcnow() - timedelta(hours=skip_recent_hours)
for account in accounts_result.data:
if not account.get("last_fetched") or \
datetime.fromisoformat(account["last_fetched"].replace("Z", "+00:00")).replace(tzinfo=None) < cutoff:
accounts_to_fetch.append(account)
if len(accounts_to_fetch) >= n:
break
else:
accounts_to_fetch = accounts_result.data[:n]
if not accounts_to_fetch:
return f"All top ROI accounts were recently fetched (within {skip_recent_hours}h)."
# Fetch tweets
total_fetched = 0
fetched_accounts = []
errors = []
for account in accounts_to_fetch:
count = _fetch_and_store_tweets(account)
roi_stats = account_roi.get(account["username"], {})
rate = roi_stats.get("rate", 0)
if count >= 0:
total_fetched += count
fetched_accounts.append(f"@{account['username']} (ROI: {rate:.1f})")
else:
errors.append(f"@{account['username']}")
result_msg = f"Fetched {total_fetched} tweets from {len(fetched_accounts)} top-ROI accounts:\n"
result_msg += "\n".join(fetched_accounts)
if errors:
result_msg += f"\n\nErrors: {', '.join(errors)}"
return result_msg
@mcp.tool()
def engagio_fetch_smart(budget: int = 100, skip_recent_hours: int = 2) -> str:
"""
Smart fetch within a credit budget. Prioritizes by:
1. Your engagement ROI (accounts where your replies perform best)
2. Time since last fetch (stale accounts first)
3. Account posting frequency (active accounts)
Budget is in credits (~15 credits per timeline fetch).
Example: budget=100 ≈ 6-7 accounts (~$0.01)
"""
credits_per_fetch = 15
max_accounts = budget // credits_per_fetch
if max_accounts < 1:
return f"Budget too low. Minimum {credits_per_fetch} credits needed for 1 account."
# Get ROI data
cutoff_30d = (datetime.utcnow() - timedelta(days=30)).isoformat()
replies = supabase.table("replies")\
.select("original_author, likes_received")\
.gte("replied_at", cutoff_30d)\
.execute()
account_roi: dict[str, float] = {}
for r in (replies.data or []):
author = r.get("original_author", "")
if not author:
continue
if author not in account_roi:
account_roi[author] = 0
account_roi[author] += r.get("likes_received", 0) or 0
# Get all accounts
accounts = supabase.table("monitored_accounts").select("*").execute()
if not accounts.data:
return "No accounts to fetch."
# Score each account
now = datetime.utcnow()
scored_accounts = []
for account in accounts.data:
username = account["username"]
# ROI score (0-50 points)
roi = account_roi.get(username, 0)
roi_score = min(50, roi * 5) # Cap at 50
# Staleness score (0-30 points) - older = higher
last_fetched = account.get("last_fetched")
if last_fetched:
try:
last_dt = datetime.fromisoformat(last_fetched.replace("Z", "+00:00")).replace(tzinfo=None)
hours_old = (now - last_dt).total_seconds() / 3600
staleness_score = min(30, hours_old * 2) # 2 points per hour, cap at 30
except (ValueError, TypeError):
staleness_score = 30 # Unknown = treat as stale
else:
staleness_score = 30 # Never fetched = very stale
# Skip if too recent
if skip_recent_hours > 0 and staleness_score < skip_recent_hours * 2:
continue
# Follower score (0-20 points) - log scale
followers = account.get("followers", 0)
follower_score = min(20, math.log10(max(followers, 100)) * 4)
total_score = roi_score + staleness_score + follower_score
scored_accounts.append((account, total_score, roi_score, staleness_score))
if not scored_accounts:
return f"All accounts were recently fetched (within {skip_recent_hours}h)."
# Sort by score and take top accounts within budget
scored_accounts.sort(key=lambda x: x[1], reverse=True)
accounts_to_fetch = [a[0] for a in scored_accounts[:max_accounts]]
# Fetch tweets
total_fetched = 0
fetched_accounts = []
credits_used = 0
for account in accounts_to_fetch:
count = _fetch_and_store_tweets(account)
if count >= 0:
total_fetched += count
credits_used += credits_per_fetch
fetched_accounts.append(f"@{account['username']}")
result_msg = f"Smart fetch complete!\n"
result_msg += f"Budget: {budget} credits | Used: ~{credits_used} credits\n"
result_msg += f"Fetched {total_fetched} tweets from {len(fetched_accounts)} accounts\n"
result_msg += f"Accounts: {', '.join(fetched_accounts)}"
return result_msg
@mcp.tool()
def engagio_get_opportunities(
limit: int = 10,
min_score: float = 0,
max_replies: int = 100,
exclude_retweets: bool = True,
keywords: list[str] | None = None,
min_author_followers: int = 0,
hours: int = 24
) -> str:
"""
Get top engagement opportunities ranked by score.
Filters: min_score, max_replies, exclude_retweets, keywords, min_author_followers, hours.
"""
# Get tweets from last N hours, sorted by score
cutoff_time = (datetime.utcnow() - timedelta(hours=hours)).isoformat()
query = supabase.table("tweets")\
.select("*")\
.gte("posted_at", cutoff_time)\
.gte("engagement_score", min_score)\
.lte("replies", max_replies)
if exclude_retweets:
query = query.eq("is_retweet", False)
if min_author_followers > 0:
query = query.gte("author_followers", min_author_followers)
tweets_result = query.order("engagement_score", desc=True).limit(limit + 50).execute()
if not tweets_result.data:
# Check if we have any tweets at all to give better feedback
total_tweets = supabase.table("tweets").select("id", count="exact").execute()
tweet_count = total_tweets.count if hasattr(total_tweets, 'count') else len(total_tweets.data or [])
if tweet_count > 0:
return f"No tweets in the last {hours} hours. You have {tweet_count} older tweets cached. Try increasing hours parameter or run engagio_fetch_tweets."
return "No opportunities found. Run engagio_fetch_tweets first."
# Get IDs of tweets we've already replied to
replies_result = supabase.table("replies").select("original_tweet_id").execute()
replied_ids = {r["original_tweet_id"] for r in (replies_result.data or [])}
# Filter out replied tweets and apply keyword filter
opportunities = []
for t in tweets_result.data:
if t["id"] in replied_ids:
continue
if keywords:
text_lower = (t.get("text") or "").lower()
if not any(kw.lower() in text_lower for kw in keywords):
continue
opportunities.append(t)
if len(opportunities) >= limit:
break
if not opportunities:
return "No new opportunities found matching your criteria."
lines = [f"Top {len(opportunities)} Engagement Opportunities:", ""]
for i, tweet in enumerate(opportunities, 1):
score = tweet.get("engagement_score", 0)
username = tweet.get("author_username", "unknown")
followers = tweet.get("author_followers", 0)
text = (tweet.get("text") or "")[:150]
likes = tweet.get("likes", 0)
retweets = tweet.get("retweets", 0)
replies = tweet.get("replies", 0)
tweet_id = tweet.get("id")
posted = tweet.get("posted_at", "")[:19] if tweet.get("posted_at") else ""
lines.append(f"{i}. Score: {score} | @{username} ({followers:,} followers)")
lines.append(f' "{text}{"..." if len(tweet.get("text") or "") > 150 else ""}"')
lines.append(f" {likes} likes | {retweets} RTs | {replies} replies | {posted}")
lines.append(f" Tweet ID: {tweet_id}")
lines.append("")
return "\n".join(lines)
@mcp.tool()
def engagio_post_reply(tweet_id: str, text: str, tone: str | None = None) -> str:
"""Post a reply to a tweet and track it with timing data"""
# Get original tweet info
tweet_result = supabase.table("tweets").select("*").eq("id", tweet_id).execute()
original_author = tweet_result.data[0]["author_username"] if tweet_result.data else "unknown"
# Post the reply
result = twitter_post_tweet(text, reply_to_id=tweet_id)
if result["status"] == 201:
reply_id = result["data"].get("data", {}).get("id")
now = datetime.utcnow()
# Track the reply in Supabase with timing data
supabase.table("replies").insert({
"original_tweet_id": tweet_id,
"original_author": original_author,
"reply_text": text,
"reply_tweet_id": reply_id,
"reply_tone": tone,
"hour_of_day": now.hour,
"day_of_week": now.weekday()
}).execute()
return f"Reply posted successfully!\nReply ID: {reply_id}\nURL: https://twitter.com/i/status/{reply_id}"
else:
return f"Error posting reply: {result['data']}"
@mcp.tool()
def engagio_post_tweet(text: str) -> str:
"""Post an original tweet (not a reply)"""
result = twitter_post_tweet(text)
if result["status"] == 201:
tweet_id = result["data"].get("data", {}).get("id")
return f"Tweet posted successfully!\nTweet ID: {tweet_id}\nURL: https://twitter.com/i/status/{tweet_id}"
else:
return f"Error posting tweet: {result['data']}"
@mcp.tool()
def engagio_get_reply_history(days: int = 7) -> str:
"""Get your recent reply history"""
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
result = supabase.table("replies")\
.select("*")\
.gte("replied_at", cutoff)\
.order("replied_at", desc=True)\
.execute()
if not result.data:
return f"No replies in the last {days} days."
lines = [f"Reply History (last {days} days): {len(result.data)} replies", ""]
for r in result.data:
replied_at = r.get("replied_at", "")[:16] if r.get("replied_at") else ""
lines.append(f"- {replied_at} | @{r.get('original_author', 'unknown')}")
reply_text = r.get("reply_text", "")[:80]
lines.append(f' "{reply_text}{"..." if len(r.get("reply_text", "")) > 80 else ""}"')
if r.get("likes_received") or r.get("replies_received"):
lines.append(f" Performance: {r.get('likes_received', 0)} likes, {r.get('replies_received', 0)} replies")
lines.append("")
return "\n".join(lines)
@mcp.tool()
def engagio_get_tweet(tweet_id: str) -> str:
"""Get full details of a specific tweet by ID"""
result = supabase.table("tweets").select("*").eq("id", tweet_id).execute()
if result.data:
return json.dumps(result.data[0], indent=2, default=str)
return f"Tweet {tweet_id} not in database. Run engagio_fetch_tweets first."
@mcp.tool()
def engagio_search_tweets(query: str, max_results: int = 20, cache_only: bool = False, hours: int = 72) -> str:
"""
Search for tweets matching a query.
COST OPTIMIZATION:
- cache_only=True: Search only cached tweets (FREE)
- cache_only=False: Hit API if cache has < max_results (costs money)
The cache search looks for keywords in tweet text from the last N hours (default: 72).
"""
# First, search the cache
keywords = query.lower().split()
cutoff_time = (datetime.utcnow() - timedelta(hours=hours)).isoformat()
# Search cache using ILIKE for each keyword
cache_query = supabase.table("tweets")\
.select("*")\
.gte("posted_at", cutoff_time)\
.eq("is_retweet", False)\
.order("engagement_score", desc=True)\
.limit(max_results * 2)
cache_result = cache_query.execute()
cached_tweets = []
if cache_result.data:
# Filter by keywords in Python (more flexible than SQL)
for t in cache_result.data:
text_lower = (t.get("text") or "").lower()
if any(kw in text_lower for kw in keywords):
cached_tweets.append(t)
if len(cached_tweets) >= max_results:
break
# If we have enough cached results or cache_only mode, return them
if len(cached_tweets) >= max_results or cache_only:
if not cached_tweets:
if cache_only:
# Check total tweets for better feedback
total_tweets = supabase.table("tweets").select("id", count="exact").execute()
tweet_count = total_tweets.count if hasattr(total_tweets, 'count') else len(total_tweets.data or [])
if tweet_count > 0:
return f"No cached tweets matching '{query}' in the last {hours} hours. You have {tweet_count} tweets cached. Try increasing hours parameter or use cache_only=False."
return f"No cached tweets matching '{query}'. Use cache_only=False to search Twitter API."
return f"No tweets found for query: {query}"
lines = [f"Search results for '{query}' (from cache, FREE):", ""]
for tweet in cached_tweets[:max_results]:
username = tweet.get("author_username", "unknown")
author_followers = tweet.get("author_followers", 0)
text = (tweet.get("text") or "")[:120]
likes = tweet.get("likes", 0)
tweet_id = tweet.get("id")
score = tweet.get("engagement_score", 0)
lines.append(f"@{username} ({author_followers:,}): \"{text}...\"")
lines.append(f" {likes} likes | Score: {score} | ID: {tweet_id}")
lines.append("")
return "\n".join(lines)
# Not enough cached results - hit the API
result = twitterapi_get("tweet/advanced_search", {
"query": query,
"queryType": "Latest"
})
if result.get("status") == "error":
# Fall back to whatever cache we have
if cached_tweets:
lines = [f"API error, showing cached results for '{query}':", ""]
for tweet in cached_tweets:
lines.append(f"@{tweet.get('author_username', 'unknown')}: \"{(tweet.get('text') or '')[:120]}...\"")
lines.append(f" {tweet.get('likes', 0)} likes | ID: {tweet.get('id')}")
lines.append("")
return "\n".join(lines)
return f"Error searching: {result.get('msg', result)}"
tweets = result.get("tweets", [])[:max_results]
if not tweets:
return f"No tweets found for query: {query}"
lines = [f"Search results for '{query}' (from API):", ""]
for tweet in tweets:
author = tweet.get("author", {})
username = author.get("userName", "unknown")
author_followers = author.get("followers", 0)
text = tweet.get("text", "")[:120]
is_retweet = text.startswith("RT @")
likes = tweet.get("likeCount", 0)
tweet_id = tweet.get("id")
score = calculate_engagement_score(tweet, author_followers)
# Cache in database
supabase.table("tweets").upsert({
"id": tweet_id,
"author_username": username,
"author_user_id": author.get("id"),
"text": tweet.get("text", ""),
"likes": tweet.get("likeCount", 0),
"retweets": tweet.get("retweetCount", 0),
"replies": tweet.get("replyCount", 0),
"views": tweet.get("viewCount", 0),
"posted_at": parse_twitter_timestamp(tweet.get("createdAt", "")),
"engagement_score": score,
"is_retweet": is_retweet,
"author_followers": author_followers
}).execute()
lines.append(f"@{username} ({author_followers:,}): \"{text}...\"")
lines.append(f" {likes} likes | Score: {score} | ID: {tweet_id}")
lines.append("")
return "\n".join(lines)
@mcp.tool()
def engagio_get_analytics(days: int = 30) -> str:
"""Get engagement analytics for the past N days"""
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
# Get all replies in period
replies = supabase.table("replies")\
.select("*")\
.gte("replied_at", cutoff)\
.execute()
if not replies.data:
return f"No reply data in the last {days} days."
total_replies = len(replies.data)
total_likes = sum(r.get("likes_received", 0) for r in replies.data)
total_reply_replies = sum(r.get("replies_received", 0) for r in replies.data)
# Group by author
author_stats: dict[str, dict[str, int]] = {}
for r in replies.data:
author = r.get("original_author", "unknown")
if author not in author_stats:
author_stats[author] = {"count": 0, "likes": 0}
author_stats[author]["count"] += 1
author_stats[author]["likes"] += r.get("likes_received", 0)
# Sort by engagement
top_authors = sorted(author_stats.items(), key=lambda x: x[1]["likes"], reverse=True)[:5]
lines = [
f"Engagement Analytics (last {days} days)",
"",
f"Total Replies: {total_replies}",
f"Total Likes Received: {total_likes}",
f"Total Replies Received: {total_reply_replies}",
f"Avg Likes per Reply: {total_likes / total_replies:.1f}" if total_replies else "",
"",
"Top Accounts Engaged:",
]
for author, stats in top_authors:
lines.append(f" @{author}: {stats['count']} replies, {stats['likes']} likes received")
return "\n".join(lines)
@mcp.tool()
def engagio_update_reply_performance(days: int = 7) -> str:
"""
Fetch current engagement stats for your recent replies and update the database.
Call this periodically to track how your replies are performing.
"""
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
# Get replies that have a reply_tweet_id (successfully posted)
replies = supabase.table("replies")\
.select("*")\
.gte("replied_at", cutoff)\
.not_.is_("reply_tweet_id", "null")\
.execute()
if not replies.data:
return f"No replies with tweet IDs in the last {days} days."
# Get tweet IDs
tweet_ids = [r["reply_tweet_id"] for r in replies.data if r.get("reply_tweet_id")]
if not tweet_ids:
return "No reply tweet IDs to fetch."
# Fetch current engagement from Twitter
result = twitterapi_get_tweets_by_ids(tweet_ids)
if result.get("status") == "error":
return f"Error fetching tweet data: {result.get('msg')}"
tweets = result.get("tweets", [])
if not tweets:
return "Could not fetch engagement data for replies."
# Create lookup by tweet ID
tweet_data = {t["id"]: t for t in tweets}
# Update each reply's performance
updated = 0
for reply in replies.data:
reply_tweet_id = reply.get("reply_tweet_id")
if reply_tweet_id and reply_tweet_id in tweet_data:
tweet = tweet_data[reply_tweet_id]
likes = tweet.get("likeCount", 0)
reply_count = tweet.get("replyCount", 0)
# Update in Supabase
supabase.table("replies").update({
"likes_received": likes,
"replies_received": reply_count
}).eq("id", reply["id"]).execute()
updated += 1
return f"Updated performance for {updated} replies.\n\nRun engagio_get_analytics() to see your stats."
# ============ Topic Monitoring Tools ============
@mcp.tool()
def engagio_add_topic(topic: str) -> str:
"""Add a topic/hashtag to monitor (e.g., #buildinpublic, AI agents)"""
topic = topic.strip().lower()
topic_type = "hashtag" if topic.startswith("#") else "keyword"
# Check if already exists
existing = supabase.table("monitored_topics").select("*").eq("topic", topic).execute()
if existing.data:
return f"Topic '{topic}' is already being monitored"
supabase.table("monitored_topics").insert({
"topic": topic,
"type": topic_type
}).execute()
return f"Added topic '{topic}' ({topic_type}) to monitoring"
@mcp.tool()
def engagio_remove_topic(topic: str) -> str:
"""Remove a topic from monitoring"""
topic = topic.strip().lower()
result = supabase.table("monitored_topics").delete().eq("topic", topic).execute()
if result.data:
return f"Removed topic '{topic}' from monitoring"
return f"Topic '{topic}' was not being monitored"
@mcp.tool()
def engagio_list_topics() -> str:
"""List all monitored topics"""
result = supabase.table("monitored_topics").select("*").execute()
if not result.data:
return "No topics being monitored. Use engagio_add_topic to add some."
lines = ["Monitored Topics:", ""]
for topic in result.data:
lines.append(f" {topic['topic']} ({topic['type']})")
return "\n".join(lines)
@mcp.tool()
def engagio_fetch_topic_tweets(topic: str | None = None, hours: int = 6) -> str:
"""Fetch tweets for monitored topics using Twitter search"""
if topic:
topics_to_fetch = [{"topic": topic.strip().lower()}]
else:
result = supabase.table("monitored_topics").select("*").execute()
topics_to_fetch = result.data or []
if not topics_to_fetch:
return "No topics to fetch. Add topics first with engagio_add_topic."
total_fetched = 0
for t in topics_to_fetch:
topic_query = t["topic"]
result = twitterapi_get("tweet/advanced_search", {
"query": topic_query,
"queryType": "Latest"
})
# Search endpoint returns {"tweets": [...]} directly, not wrapped in status
if result.get("status") != "error":
tweets = result.get("tweets", [])
for tweet in tweets:
tweet_id = tweet.get("id")
if not tweet_id:
continue
author = tweet.get("author", {})
author_followers = author.get("followers", 0)
text = tweet.get("text", "")
is_retweet = text.startswith("RT @")
score = calculate_engagement_score(tweet, author_followers)
supabase.table("tweets").upsert({
"id": tweet_id,
"author_username": author.get("userName", "unknown"),
"author_user_id": author.get("id"),
"text": text,
"likes": tweet.get("likeCount", 0),
"retweets": tweet.get("retweetCount", 0),
"replies": tweet.get("replyCount", 0),
"views": tweet.get("viewCount", 0),
"posted_at": parse_twitter_timestamp(tweet.get("createdAt", "")),
"engagement_score": score,
"is_retweet": is_retweet,
"author_followers": author_followers
}).execute()
total_fetched += 1
return f"Fetched {total_fetched} tweets for {len(topics_to_fetch)} topic(s)"
# ============ Account ROI Tools ============
@mcp.tool()
def engagio_get_account_roi(days: int = 30) -> str:
"""Get ROI stats per monitored account - which accounts give best engagement"""
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
# Get all replies with their performance
replies = supabase.table("replies")\
.select("*")\
.gte("replied_at", cutoff)\
.execute()
if not replies.data:
return f"No reply data in the last {days} days."
# Group by author
author_stats: dict[str, dict[str, Any]] = {}
for r in replies.data:
author = r.get("original_author", "unknown")
if author not in author_stats:
author_stats[author] = {
"replies": 0,
"likes": 0,
"reply_replies": 0,
"best_reply": None,
"best_likes": 0
}
author_stats[author]["replies"] += 1
likes = r.get("likes_received", 0)
author_stats[author]["likes"] += likes
author_stats[author]["reply_replies"] += r.get("replies_received", 0)
if likes > author_stats[author]["best_likes"]:
author_stats[author]["best_likes"] = likes
author_stats[author]["best_reply"] = r.get("reply_text", "")[:50]
# Calculate engagement rate and sort
for author, stats in author_stats.items():
stats["engagement_rate"] = stats["likes"] / stats["replies"] if stats["replies"] > 0 else 0
sorted_accounts = sorted(author_stats.items(), key=lambda x: x[1]["engagement_rate"], reverse=True)
# Get follower counts for context
accounts = supabase.table("monitored_accounts").select("username, followers").execute()
followers_map = {a["username"]: a["followers"] for a in (accounts.data or [])}
lines = [f"Account ROI (last {days} days)", "", "Ranked by engagement rate (likes per reply):", ""]
for author, stats in sorted_accounts:
followers = followers_map.get(author, 0)
lines.append(f"@{author} ({followers:,} followers)")
lines.append(f" Replies: {stats['replies']} | Likes: {stats['likes']} | Rate: {stats['engagement_rate']:.1f}")
if stats["best_reply"]:
lines.append(f" Best: \"{stats['best_reply']}...\" ({stats['best_likes']} likes)")
lines.append("")
return "\n".join(lines)
# ============ Reply Queue Tools ============
REPLY_SPACING_MINUTES = 15
@mcp.tool()
def engagio_bulk_reply(replies: list[dict]) -> str:
"""
Post first reply immediately, queue the rest with 15min spacing.
Input: list of {"tweet_id": "...", "text": "..."} dicts.
Example: engagio_bulk_reply([
{"tweet_id": "123", "text": "first reply"},
{"tweet_id": "456", "text": "second reply"}
])
"""
if not replies:
return "No replies provided."
results = []
now = datetime.utcnow()
# Post first one immediately
first = replies[0]
tweet_id = first.get("tweet_id", "")
text = first.get("text", "")
# Get original tweet info
tweet_result = supabase.table("tweets").select("*").eq("id", tweet_id).execute()
original_author = tweet_result.data[0]["author_username"] if tweet_result.data else "unknown"
# Post it
post_result = twitter_post_tweet(text, reply_to_id=tweet_id)
if post_result["status"] == 201:
reply_id = post_result["data"].get("data", {}).get("id")
# Track in replies table
supabase.table("replies").insert({
"original_tweet_id": tweet_id,
"original_author": original_author,
"reply_text": text,
"reply_tweet_id": reply_id,
"hour_of_day": now.hour,
"day_of_week": now.weekday()
}).execute()
results.append(f"✅ @{original_author}: \"{text[:40]}...\" → https://twitter.com/i/status/{reply_id}")
else:
results.append(f"❌ @{original_author}: Failed - {post_result['data']}")
# Queue the rest with spacing
scheduled_time = now
for reply in replies[1:]:
scheduled_time = scheduled_time + timedelta(minutes=REPLY_SPACING_MINUTES)
tweet_id = reply.get("tweet_id", "")
text = reply.get("text", "")
supabase.table("reply_queue").insert({
"tweet_id": tweet_id,
"reply_text": text,
"scheduled_for": scheduled_time.isoformat(),
"status": "pending"
}).execute()
# Get author for display
tweet_result = supabase.table("tweets").select("author_username").eq("id", tweet_id).execute()
author = tweet_result.data[0]["author_username"] if tweet_result.data else "unknown"
results.append(f"⏰ @{author}: \"{text[:40]}...\" → {scheduled_time.strftime('%H:%M')} UTC")
return f"Bulk reply: 1 posted, {len(replies)-1} queued\n\n" + "\n".join(results)
@mcp.tool()
def engagio_queue_reply(tweet_id: str, text: str) -> str:
"""Add a reply to the queue - auto-schedules 15min after last queued/posted reply"""
# Find the latest scheduled time in queue or last posted reply
queue_result = supabase.table("reply_queue")\
.select("scheduled_for")\
.eq("status", "pending")\
.order("scheduled_for", desc=True)\
.limit(1)\
.execute()
replies_result = supabase.table("replies")\
.select("replied_at")\
.order("replied_at", desc=True)\
.limit(1)\
.execute()
now = datetime.utcnow()
last_queue_time = None
last_post_time = None
if queue_result.data:
try:
last_queue_time = datetime.fromisoformat(queue_result.data[0]["scheduled_for"].replace("Z", "+00:00")).replace(tzinfo=None)
except (ValueError, TypeError):
pass
if replies_result.data:
try:
last_post_time = datetime.fromisoformat(replies_result.data[0]["replied_at"].replace("Z", "+00:00")).replace(tzinfo=None)
except (ValueError, TypeError):
pass
# Determine scheduled time
reference_time = max(filter(None, [last_queue_time, last_post_time, now - timedelta(minutes=REPLY_SPACING_MINUTES)]))
scheduled_for = reference_time + timedelta(minutes=REPLY_SPACING_MINUTES)
# If scheduled time is in the past, schedule now
if scheduled_for < now:
scheduled_for = now
supabase.table("reply_queue").insert({
"tweet_id": tweet_id,
"reply_text": text,
"scheduled_for": scheduled_for.isoformat(),
"status": "pending"
}).execute()
wait_minutes = max(0, int((scheduled_for - now).total_seconds() / 60))
return f"Reply queued for {scheduled_for.strftime('%H:%M')} UTC (in {wait_minutes} min)"
@mcp.tool()
def engagio_view_queue() -> str:
"""View pending replies in queue with scheduled times"""
result = supabase.table("reply_queue")\
.select("*")\
.eq("status", "pending")\
.order("scheduled_for")\
.execute()
if not result.data:
return "Reply queue is empty."
lines = ["Reply Queue:", ""]
now = datetime.utcnow()
for i, item in enumerate(result.data, 1):
scheduled = item.get("scheduled_for", "")[:19]
tweet_id = item["tweet_id"]
text = item["reply_text"][:60]
try:
sched_time = datetime.fromisoformat(scheduled)
if sched_time < now:
status = "READY"
else:
mins = int((sched_time - now).total_seconds() / 60)
status = f"in {mins}min"
except (ValueError, TypeError):
status = "?"
lines.append(f"{i}. [{status}] Tweet: {tweet_id}")
lines.append(f" \"{text}...\"")
lines.append("")
return "\n".join(lines)
@mcp.tool()
def engagio_post_next() -> str:
"""Post the next due reply from queue (if time has passed)"""
now = datetime.utcnow()
# Get next pending reply that's due
result = supabase.table("reply_queue")\
.select("*")\
.eq("status", "pending")\
.lte("scheduled_for", now.isoformat())\
.order("scheduled_for")\
.limit(1)\
.execute()
if not result.data:
# Check if there are pending items
pending = supabase.table("reply_queue").select("scheduled_for").eq("status", "pending").limit(1).execute()
if pending.data:
next_time = pending.data[0]["scheduled_for"][:19]
return f"No replies due yet. Next scheduled: {next_time}"
return "Reply queue is empty."
item = result.data[0]
# Post the reply
post_result = engagio_post_reply(item["tweet_id"], item["reply_text"])
if "successfully" in post_result:
# Update queue item status
supabase.table("reply_queue").update({
"status": "posted",
"posted_at": now.isoformat()
}).eq("id", item["id"]).execute()
return f"Posted queued reply!\n{post_result}"
else:
# Mark as failed
supabase.table("reply_queue").update({
"status": "failed"
}).eq("id", item["id"]).execute()
return f"Failed to post queued reply: {post_result}"
@mcp.tool()
def engagio_clear_queue() -> str:
"""Clear all pending replies from queue"""
result = supabase.table("reply_queue").delete().eq("status", "pending").execute()
count = len(result.data) if result.data else 0
return f"Cleared {count} pending replies from queue."
# ============ Thread Detection Tools ============
@mcp.tool()
def engagio_get_thread(tweet_id: str) -> str:
"""Get full thread context for a tweet"""
# First get the tweet
tweet = supabase.table("tweets").select("*").eq("id", tweet_id).execute()
if not tweet.data:
return f"Tweet {tweet_id} not found in database."
current = tweet.data[0]
thread_tweets = [current]
# If it's part of a thread, find the root and all replies
if current.get("is_thread") and current.get("thread_id"):
# Get parent tweets
parent_id = current.get("thread_id")
while parent_id:
parent = supabase.table("tweets").select("*").eq("id", parent_id).execute()
if parent.data:
thread_tweets.insert(0, parent.data[0])
parent_id = parent.data[0].get("thread_id")
else:
break
# Find any child tweets (replies to this tweet by same author)
author = current.get("author_username")
children = supabase.table("tweets")\
.select("*")\
.eq("thread_id", tweet_id)\
.eq("author_username", author)\
.order("posted_at")\
.execute()
if children.data:
thread_tweets.extend(children.data)
if len(thread_tweets) == 1:
return f"This tweet is not part of a thread.\n\n@{author}: \"{current.get('text')}\""
lines = [f"Thread by @{author} ({len(thread_tweets)} tweets):", ""]
for i, t in enumerate(thread_tweets, 1):
text = t.get("text", "")
marker = " << this tweet" if t["id"] == tweet_id else ""
lines.append(f"{i}. \"{text}\"{marker}")
lines.append("")
return "\n".join(lines)
# ============ Timing Analytics Tools ============
@mcp.tool()
def engagio_get_timing_insights(days: int = 30) -> str:
"""Analyze which times your replies perform best"""
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
replies = supabase.table("replies")\
.select("*")\
.gte("replied_at", cutoff)\
.not_.is_("hour_of_day", "null")\
.execute()
if not replies.data:
return f"No timing data in the last {days} days. Post some replies first."
# Aggregate by hour
hour_stats: dict[int, dict[str, int]] = {}
day_stats: dict[int, dict[str, int]] = {}
for r in replies.data:
hour = r.get("hour_of_day")
day = r.get("day_of_week")
likes = r.get("likes_received", 0)
if hour is not None:
if hour not in hour_stats:
hour_stats[hour] = {"count": 0, "likes": 0}
hour_stats[hour]["count"] += 1
hour_stats[hour]["likes"] += likes
if day is not None:
if day not in day_stats:
day_stats[day] = {"count": 0, "likes": 0}
day_stats[day]["count"] += 1
day_stats[day]["likes"] += likes
# Calculate averages and sort
for stats in hour_stats.values():
stats["avg"] = stats["likes"] / stats["count"] if stats["count"] > 0 else 0
for stats in day_stats.values():
stats["avg"] = stats["likes"] / stats["count"] if stats["count"] > 0 else 0
best_hours = sorted(hour_stats.items(), key=lambda x: x[1]["avg"], reverse=True)[:5]
best_days = sorted(day_stats.items(), key=lambda x: x[1]["avg"], reverse=True)
day_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
lines = [f"Timing Insights (last {days} days)", ""]
lines.append("Best Hours (UTC):")
for hour, stats in best_hours:
lines.append(f" {hour:02d}:00 - {stats['avg']:.1f} avg likes ({stats['count']} replies)")
lines.append("")
lines.append("Best Days:")
for day, stats in best_days:
day_name = day_names[day] if 0 <= day < 7 else str(day)
lines.append(f" {day_name} - {stats['avg']:.1f} avg likes ({stats['count']} replies)")
return "\n".join(lines)
# ============ Cost Tracking Tools ============
@mcp.tool()
def engagio_get_cost_report(days: int = 7) -> str:
"""
Get API cost report for the past N days.
Shows credits used, estimated cost, and breakdown by endpoint.
Cost estimate: ~$0.0001 per credit (twitterapi.io pricing)
"""
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
result = supabase.table("api_usage")\
.select("*")\
.gte("called_at", cutoff)\
.order("called_at", desc=True)\
.execute()
if not result.data:
return f"No API usage recorded in the last {days} days."
# Aggregate stats
total_credits = 0
total_calls = len(result.data)
endpoint_stats: dict[str, dict[str, int]] = {}
daily_credits: dict[str, int] = {}
for usage in result.data:
credits = usage.get("credits_used", 0)
endpoint = usage.get("endpoint", "unknown")
called_at = usage.get("called_at", "")[:10] # Date part only
total_credits += credits
# By endpoint
if endpoint not in endpoint_stats:
endpoint_stats[endpoint] = {"calls": 0, "credits": 0}
endpoint_stats[endpoint]["calls"] += 1
endpoint_stats[endpoint]["credits"] += credits
# By day
if called_at not in daily_credits:
daily_credits[called_at] = 0
daily_credits[called_at] += credits
# Estimate cost (~$0.0001 per credit, or $10 per 100k credits)
estimated_cost = total_credits * 0.0001
lines = [
f"API Cost Report (last {days} days)",
"",
f"Total API Calls: {total_calls}",
f"Total Credits: {total_credits:,}",
f"Estimated Cost: ${estimated_cost:.4f}",
"",
"By Endpoint:",
]
# Sort endpoints by credits used
sorted_endpoints = sorted(endpoint_stats.items(), key=lambda x: x[1]["credits"], reverse=True)
for endpoint, stats in sorted_endpoints:
lines.append(f" {endpoint}: {stats['calls']} calls, {stats['credits']:,} credits")
lines.append("")
lines.append("Daily Usage:")
# Sort by date descending
sorted_days = sorted(daily_credits.items(), reverse=True)[:7] # Last 7 days max
for day, credits in sorted_days:
cost = credits * 0.0001
lines.append(f" {day}: {credits:,} credits (${cost:.4f})")
# Add optimization tips
lines.append("")
lines.append("Optimization Tips:")
# Check for single user calls that could be batched
single_user_calls = endpoint_stats.get("user/info", {}).get("calls", 0)
if single_user_calls > 5:
lines.append(f" - Use engagio_refresh_accounts() instead of individual lookups ({single_user_calls} single calls could be 1 batch)")
# Check batch efficiency
batch_calls = endpoint_stats.get("user/batch_info_by_ids", {}).get("calls", 0)
if batch_calls > 0:
lines.append(f" - Good: Using batch API ({batch_calls} batch calls)")
return "\n".join(lines)
# Run the server
if __name__ == "__main__":
mcp.run()