find_top_posts
Fetch and filter top WallStreetBets posts by score, comments, and flairs for targeted market insights. Returns a curated dataset optimized for analysis, cached for 5 minutes.
Instructions
Fetch and filter WSB posts based on criteria. Caches results for 5 minutes.
Args:
min_score: Minimum score (upvotes) required
min_comments: Minimum number of comments required
limit: Maximum number of posts to return
excluded_flairs: List of post flairs to exclude. Defaults to ["Meme", "Shitpost", "Gain", "Loss"].
Returns:
A dictionary with filtered posts data
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| excluded_flairs | No | ||
| limit | No | ||
| min_comments | No | ||
| min_score | No |
Implementation Reference
- server.py:118-210 (handler)The core handler function implementing the 'find_top_posts' tool. It is decorated with @mcp.tool() for registration in the MCP server. Fetches top hot posts from r/wallstreetbets, filters by min_score, min_comments, excludes specified flairs, uses a min-heap to select top limit posts by score, includes caching, and supports progress reporting.@mcp.tool() async def find_top_posts(min_score: int = 100, min_comments: int = 10, limit: int = 10, excluded_flairs: list[str] = ["Meme", "Shitpost", "Gain", "Loss"], ctx: Context = None) -> dict: """ Fetch and filter WSB posts based on criteria. Caches results for 5 minutes. Args: min_score: Minimum score (upvotes) required min_comments: Minimum number of comments required limit: Maximum number of posts to return excluded_flairs: List of post flairs to exclude. Defaults to ["Meme", "Shitpost", "Gain", "Loss"]. Returns: A dictionary with filtered posts data """ # --- Cache Check --- # Sort flairs to ensure consistent key regardless of order excluded_flairs_tuple = tuple(sorted(excluded_flairs)) cache_key = f"find_top_posts:{min_score}:{min_comments}:{limit}:{excluded_flairs_tuple}" current_time = time.time() if cache_key in CACHE_DATA and current_time < CACHE_EXPIRY.get(cache_key, 0): logger.info(f"Cache hit for {cache_key}") return CACHE_DATA[cache_key] logger.info(f"Cache miss for {cache_key}") # --- End Cache Check --- try: if ctx: await ctx.report_progress(0, 2) reddit = await get_reddit_client() if not reddit: return {"error": "Unable to connect to Reddit API. Check your credentials."} try: # Fetch posts if ctx: await ctx.report_progress(1, 2) subreddit = await reddit.subreddit("wallstreetbets") hot_posts = subreddit.hot(limit=50) top_posts_heap = [] # Min-heap storing (score, post_dict) async for post in hot_posts: # Filter if post.score >= min_score and \ post.num_comments >= min_comments and \ (post.link_flair_text or "") not in excluded_flairs: post_data = { "id": post.id, "url": f"https://www.reddit.com{post.permalink}", "title": post.title, "selftext": post.selftext, "score": post.score, "num_comments": post.num_comments, "upvote_ratio": post.upvote_ratio, "link_flair_text": post.link_flair_text or "", "created_utc": post.created_utc } if len(top_posts_heap) < limit: heapq.heappush(top_posts_heap, (post.score, post_data)) elif post.score > top_posts_heap[0][0]: # Compare with min score in heap # If current post is better than the worst in the heap, replace it heapq.heapreplace(top_posts_heap, (post.score, post_data)) # Extract posts from heap and sort descending by score # The heap contains the top 'limit' posts based on score, but not necessarily sorted top_posts = sorted([item[1] for item in top_posts_heap], key=lambda x: x['score'], reverse=True) logger.info(f"Processed posts, selected top {len(top_posts)} posts meeting criteria") if ctx: await ctx.report_progress(2, 2) result = { "count": len(top_posts), "posts": top_posts } # --- Cache Store --- CACHE_DATA[cache_key] = result CACHE_EXPIRY[cache_key] = current_time + CACHE_TTL logger.info(f"Cached result for {cache_key} with TTL {CACHE_TTL}s") # --- End Cache Store --- return result finally: await reddit.close() except Exception as e: logger.error(f"Error in fetch_posts: {str(e)}") return {"error": f"Failed to fetch posts: {str(e)}"}
- server.py:118-118 (registration)The @mcp.tool() decorator registers the find_top_posts function as an MCP tool.@mcp.tool()
- server.py:119-131 (schema)Input schema defined by function parameters with type hints and defaults, plus detailed docstring describing args and return type.async def find_top_posts(min_score: int = 100, min_comments: int = 10, limit: int = 10, excluded_flairs: list[str] = ["Meme", "Shitpost", "Gain", "Loss"], ctx: Context = None) -> dict: """ Fetch and filter WSB posts based on criteria. Caches results for 5 minutes. Args: min_score: Minimum score (upvotes) required min_comments: Minimum number of comments required limit: Maximum number of posts to return excluded_flairs: List of post flairs to exclude. Defaults to ["Meme", "Shitpost", "Gain", "Loss"]. Returns: A dictionary with filtered posts data """