Skip to main content
Glama

MCP Bluesky Stock Sentiment

example_index.md6.64 kB
``` import { Firehose } from "@atproto/sync"; import { IdResolver } from "@atproto/identity"; import supabase from "./utils/db.js"; const MENTAL_HEALTH_KEYWORDS = [ "mental health", "mental illness", "mental disorder", "mental breakdown", "mental fatigue", "psychological", "emotional health", "emotional support", "cognitive therapy", "psychotherapy", "clinical depression", "depression", "depressed", "hopeless", "worthless", "numb", "empty", "crying", "grief", "mourning", "loss", "low mood", "burnout", "anxiety", "anxious", "panic attack", "panic disorder", "worry", "nervous", "overwhelmed", "racing thoughts", "dread", "tension", "suicide", "suicidal", "self harm", "cutting", "attempted suicide", "taking my life", "ending it all", "thoughts of suicide", "hurting myself", "ptsd", "trauma", "flashbacks", "hypervigilance", "dissociation", "emotional numbness", "abuse trauma", "childhood trauma", "sexual trauma", "bipolar", "ocd", "adhd", "borderline", "schizophrenia", "eating disorder", "anorexia", "bulimia", "personality disorder", "therapy", "counseling", "counsellor", "therapist", "psychologist", "psychiatrist", "meds", "mental health treatment", "support group", "recovery", "mental health app", "insomnia", "sleep disorder", "can’t sleep", "racing mind", "stressed", "stress", "burned out", "sleep paralysis", "i want to die", "i want to end it", "i can’t do this anymore", "life isn’t worth it", "no reason to live", "ending my life", ]; // Precompile keyword regexes const mentalHealthRegexes = MENTAL_HEALTH_KEYWORDS.map( (kw) => new RegExp(`\\b${kw.replace(/[-\/\\^$*+?.()|[\]{}]/g, "\\$&")}\\b`, "i") ); function isMentalHealthPost(text = "") { return mentalHealthRegexes.some((regex) => regex.test(text)); } // === Insert Queue Config === const INSERT_BATCH_SIZE = 100; const INSERT_INTERVAL_MS = 5000; const MAX_QUEUE_LENGTH = 1000; const MAX_RETRIES = 3; let insertQueue = []; let isInserting = false; // // === Optional: Memory Monitor === // setInterval(() => { // const mem = process.memoryUsage(); // console.log(`[MEMORY] RSS: ${(mem.rss / 1024 / 1024).toFixed(2)}MB`); // }, 60000); // === Flush Queue Safely === const flushQueue = async () => { if (insertQueue.length === 0 || isInserting) return; isInserting = true; const batch = insertQueue.splice(0, INSERT_BATCH_SIZE); try { const { error } = await supabase.from("posts_unlabeled").insert(batch); if (error) { console.error("❌ Supabase batch insert error:", error.message); // Mark retries and requeue if within limit batch.forEach((item) => { item._retries = (item._retries || 0) + 1; }); const retryable = batch.filter((item) => item._retries <= MAX_RETRIES); if (retryable.length > 0) { insertQueue.unshift(...retryable); } } else { console.log(`✅ Inserted batch of ${batch.length}`); } } catch (err) { console.error("🔥 Unexpected insert error:", err.stack || err); batch.forEach((item) => { item._retries = (item._retries || 0) + 1; }); const retryable = batch.filter((item) => item._retries <= MAX_RETRIES); if (retryable.length > 0) { insertQueue.unshift(...retryable); } } isInserting = false; // Truncate queue if too large if (insertQueue.length > MAX_QUEUE_LENGTH) { insertQueue = insertQueue.slice(-MAX_QUEUE_LENGTH); } }; setInterval(flushQueue, INSERT_INTERVAL_MS); // === Graceful Shutdown === process.on("SIGINT", async () => { console.log("🛑 Shutting down — flushing remaining posts..."); await flushQueue(); process.exit(0); }); // === Firehose Setup === const idResolver = new IdResolver(); let firehoseStarted = false; const firehose = new Firehose({ service: "wss://bsky.network", idResolver, filterCollections: ["app.bsky.feed.post"], handleEvent: async (evt) => { try { if (evt.event !== "create") return; const post = evt.record; if (!post?.text || post.reply) return; if (post?.$type !== "app.bsky.feed.post") return; if (!isMentalHealthPost(post.text)) return; if (!evt.did) return; const record = { uri: evt.uri.toString(), did: evt.did, text: post.text, created_at: post.createdAt || evt.time, langs: post.langs || [], facets: post.facets || null, reply: null, embed: post.embed || null, ingestion_time: new Date().toISOString(), }; insertQueue.push(record); if (insertQueue.length >= INSERT_BATCH_SIZE * 2) { console.log("⚠️ Queue large — flushing early..."); await flushQueue(); } } catch (err) { console.error("🔥 Event handler error:", err.stack || err); } }, onError: (err) => { console.error("🔥 Firehose stream error:", err.stack || err); }, }); if (!firehoseStarted) { firehoseStarted = true; firehose.start(); console.log("🚀 Firehose started."); } ``` ```python STOCK_KEYWORDS = [ "stock", "stocks", "equity", "equities", "invest", "investment", "investing", "buy", "sell", "bull", "bear", "market", "ticker", "finance", "trading", "portfolio", "nasdaq", "nyse", "dow", "s&p", "earnings", "ipo", "dividend", "split", "buyback", "analyst", "upgrade", "downgrade", "quarterly", "profit", "loss", "option", "call", "put" ] import re stock_regexes = [re.compile(rf"\b{re.escape(kw)}\b", re.IGNORECASE) for kw in STOCK_KEYWORDS] def is_stock_post(text): return any(rx.search(text or "") for rx in stock_regexes) from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message, models, CAR def on_message_handler(message): commit = parse_subscribe_repos_message(message) if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit): return if not commit.blocks: return car = CAR.from_bytes(commit.blocks) for op in commit.ops: if op.action == "create" and op.cid: data = car.blocks.get(op.cid) if data.get("$type") == "app.bsky.feed.post": text = data.get("text", "") if is_stock_post(text) and not data.get("reply"): # Exclude replies print(f"Stock Post: {text} | Author: {data.get('creator')}") # Optionally, append to a batch list for later insertion (e.g. into Supabase) client = FirehoseSubscribeReposClient() client.start(on_message_handler) ```

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Tar-ive/mcp-bluesky-stock-sentiment'

If you have feedback or need assistance with the MCP directory API, please join our Discord server