#!/usr/bin/env python3
"""
Engagio Queue Runner
Polls the reply queue every 5 minutes and posts due replies.
Run in background: nohup python queue_runner.py &
"""
import os
import time
from datetime import datetime
from dotenv import load_dotenv
from supabase import create_client
from requests_oauthlib import OAuth1
import requests
load_dotenv()
# Config
POLL_INTERVAL = 300 # 5 minutes
SUPABASE_URL = os.getenv("SUPABASE_URL", "")
SUPABASE_KEY = os.getenv("SUPABASE_KEY", "")
TWITTER_API_KEY = os.getenv("TWITTER_API_KEY", "")
TWITTER_API_KEY_SECRET = os.getenv("TWITTER_API_KEY_SECRET", "")
TWITTER_ACCESS_TOKEN = os.getenv("TWITTER_ACCESS_TOKEN", "")
TWITTER_ACCESS_TOKEN_SECRET = os.getenv("TWITTER_ACCESS_TOKEN_SECRET", "")
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
twitter_auth = OAuth1(
TWITTER_API_KEY,
TWITTER_API_KEY_SECRET,
TWITTER_ACCESS_TOKEN,
TWITTER_ACCESS_TOKEN_SECRET
)
def post_tweet(text: str, reply_to_id: str) -> dict:
"""Post a reply to Twitter"""
url = "https://api.twitter.com/2/tweets"
payload = {"text": text, "reply": {"in_reply_to_tweet_id": reply_to_id}}
resp = requests.post(url, auth=twitter_auth, json=payload)
return {"status": resp.status_code, "data": resp.json()}
def process_queue():
"""Check for due replies and post them"""
now = datetime.utcnow().isoformat()
# Get due replies
result = supabase.table("reply_queue")\
.select("*")\
.eq("status", "pending")\
.lte("scheduled_for", now)\
.order("scheduled_for")\
.limit(1)\
.execute()
if not result.data:
return None
item = result.data[0]
tweet_id = item["tweet_id"]
text = item["reply_text"]
# Post the reply
post_result = post_tweet(text, tweet_id)
if post_result["status"] == 201:
reply_id = post_result["data"].get("data", {}).get("id")
# Update queue status
supabase.table("reply_queue").update({
"status": "posted",
"posted_at": now
}).eq("id", item["id"]).execute()
# Track in replies table
tweet_result = supabase.table("tweets").select("author_username").eq("id", tweet_id).execute()
original_author = tweet_result.data[0]["author_username"] if tweet_result.data else "unknown"
now_dt = datetime.utcnow()
supabase.table("replies").insert({
"original_tweet_id": tweet_id,
"original_author": original_author,
"reply_text": text,
"reply_tweet_id": reply_id,
"hour_of_day": now_dt.hour,
"day_of_week": now_dt.weekday()
}).execute()
print(f"[{now}] ā
Posted to @{original_author}: {text[:50]}...")
print(f" URL: https://twitter.com/i/status/{reply_id}")
return reply_id
else:
# Mark as failed
supabase.table("reply_queue").update({
"status": "failed"
}).eq("id", item["id"]).execute()
print(f"[{now}] ā Failed: {post_result['data']}")
return None
def main():
print("š Engagio Queue Runner started")
print(f" Polling every {POLL_INTERVAL}s")
print(" Press Ctrl+C to stop\n")
while True:
try:
# Check queue
pending = supabase.table("reply_queue")\
.select("id", count="exact")\
.eq("status", "pending")\
.execute()
count = pending.count if hasattr(pending, 'count') else len(pending.data or [])
if count > 0:
print(f"[{datetime.utcnow().isoformat()}] š {count} pending in queue")
process_queue()
time.sleep(POLL_INTERVAL)
except KeyboardInterrupt:
print("\nš Queue runner stopped")
break
except Exception as e:
print(f"[{datetime.utcnow().isoformat()}] ā ļø Error: {e}")
time.sleep(60) # Wait a bit on error
if __name__ == "__main__":
main()