Skip to main content
Glama
ferdousbhai

WSB Analyst MCP Server

find_top_posts

Filter WallStreetBets posts by upvote score, comment count, and flair to identify relevant discussions for market analysis.

Instructions

Fetch and filter WSB posts based on criteria. Caches results for 5 minutes.

Args:
    min_score: Minimum score (upvotes) required
    min_comments: Minimum number of comments required
    limit: Maximum number of posts to return
    excluded_flairs: List of post flairs to exclude. Defaults to ["Meme", "Shitpost", "Gain", "Loss"].

Returns:
    A dictionary with filtered posts data

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
min_scoreNo
min_commentsNo
limitNo
excluded_flairsNo

Implementation Reference

  • The handler function decorated with @mcp.tool(), which registers and implements the 'find_top_posts' MCP tool. It fetches top posts from r/wallstreetbets subreddit using asyncpraw, applies filters for minimum score, comments, and excluded flairs, uses a heap to select top N posts, caches results, and returns a dictionary with post details.
    @mcp.tool()
    async def find_top_posts(min_score: int = 100, min_comments: int = 10, limit: int = 10, excluded_flairs: list[str] = ["Meme", "Shitpost", "Gain", "Loss"], ctx: Context = None) -> dict:
        """
        Fetch and filter WSB posts based on criteria. Caches results for 5 minutes.
    
        Args:
            min_score: Minimum score (upvotes) required
            min_comments: Minimum number of comments required
            limit: Maximum number of posts to return
            excluded_flairs: List of post flairs to exclude. Defaults to ["Meme", "Shitpost", "Gain", "Loss"].
    
        Returns:
            A dictionary with filtered posts data
        """
        # --- Cache Check ---
        # Sort flairs to ensure consistent key regardless of order
        excluded_flairs_tuple = tuple(sorted(excluded_flairs))
        cache_key = f"find_top_posts:{min_score}:{min_comments}:{limit}:{excluded_flairs_tuple}"
        current_time = time.time()
        if cache_key in CACHE_DATA and current_time < CACHE_EXPIRY.get(cache_key, 0):
            logger.info(f"Cache hit for {cache_key}")
            return CACHE_DATA[cache_key]
        logger.info(f"Cache miss for {cache_key}")
        # --- End Cache Check ---
    
        try:
            if ctx:
                await ctx.report_progress(0, 2)
    
            reddit = await get_reddit_client()
            if not reddit:
                return {"error": "Unable to connect to Reddit API. Check your credentials."}
    
            try:
                # Fetch posts
                if ctx:
                    await ctx.report_progress(1, 2)
    
                subreddit = await reddit.subreddit("wallstreetbets")
                hot_posts = subreddit.hot(limit=50)
    
                top_posts_heap = [] # Min-heap storing (score, post_dict)
    
                async for post in hot_posts:
                    # Filter
                    if post.score >= min_score and \
                       post.num_comments >= min_comments and \
                       (post.link_flair_text or "") not in excluded_flairs:
    
                        post_data = {
                            "id": post.id,
                            "url": f"https://www.reddit.com{post.permalink}",
                            "title": post.title,
                            "selftext": post.selftext,
                            "score": post.score,
                            "num_comments": post.num_comments,
                            "upvote_ratio": post.upvote_ratio,
                            "link_flair_text": post.link_flair_text or "",
                            "created_utc": post.created_utc
                        }
    
                        if len(top_posts_heap) < limit:
                            heapq.heappush(top_posts_heap, (post.score, post_data))
                        elif post.score > top_posts_heap[0][0]: # Compare with min score in heap
                            # If current post is better than the worst in the heap, replace it
                            heapq.heapreplace(top_posts_heap, (post.score, post_data))
    
                # Extract posts from heap and sort descending by score
                # The heap contains the top 'limit' posts based on score, but not necessarily sorted
                top_posts = sorted([item[1] for item in top_posts_heap], key=lambda x: x['score'], reverse=True)
    
                logger.info(f"Processed posts, selected top {len(top_posts)} posts meeting criteria")
    
                if ctx:
                    await ctx.report_progress(2, 2)
    
                result = {
                    "count": len(top_posts),
                    "posts": top_posts
                }
    
                # --- Cache Store ---
                CACHE_DATA[cache_key] = result
                CACHE_EXPIRY[cache_key] = current_time + CACHE_TTL
                logger.info(f"Cached result for {cache_key} with TTL {CACHE_TTL}s")
                # --- End Cache Store ---
    
                return result
            finally:
                await reddit.close()
        except Exception as e:
            logger.error(f"Error in fetch_posts: {str(e)}")
            return {"error": f"Failed to fetch posts: {str(e)}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ferdousbhai/wsb-analyst-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server