fetch_detailed_wsb_posts
Retrieve detailed WallStreetBets posts with filtering options for score, comments, and flairs to analyze market sentiment and discussions.
Instructions
Fetch and filter WSB posts, then get detailed information including top comments and links for each.
Args:
min_score: Minimum score (upvotes) required
min_comments: Minimum number of comments required
limit: Maximum number of posts to return
excluded_flairs: List of post flairs to exclude. Defaults to ["Meme", "Shitpost", "Gain", "Loss"].
Returns:
A dictionary with detailed data for the filtered posts.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| min_score | No | ||
| min_comments | No | ||
| limit | No | ||
| excluded_flairs | No |
Implementation Reference
- server.py:342-398 (handler)The core handler function for the 'fetch_detailed_wsb_posts' tool. It is registered via the @mcp.tool() decorator and orchestrates fetching top WSB posts and their detailed content including comments and external links.@mcp.tool() async def fetch_detailed_wsb_posts(min_score: int = 100, min_comments: int = 10, limit: int = 10, excluded_flairs: list[str] = ["Meme", "Shitpost", "Gain", "Loss"], ctx: Context = None) -> dict: """ Fetch and filter WSB posts, then get detailed information including top comments and links for each. Args: min_score: Minimum score (upvotes) required min_comments: Minimum number of comments required limit: Maximum number of posts to return excluded_flairs: List of post flairs to exclude. Defaults to ["Meme", "Shitpost", "Gain", "Loss"]. Returns: A dictionary with detailed data for the filtered posts. """ if ctx: await ctx.report_progress(0, 3) # Step 1: Fetch initial posts based on criteria posts_result = await find_top_posts( min_score=min_score, min_comments=min_comments, limit=limit, excluded_flairs=excluded_flairs, ctx=None # Don't pass context down, manage progress here ) if "error" in posts_result: logger.error(f"Error during initial post fetch in fetch_detailed_wsb_posts: {posts_result['error']}") if ctx: await ctx.report_progress(3, 3) return {"error": f"Failed during initial post fetch: {posts_result['error']}"} if not posts_result["posts"]: logger.info("No posts found matching criteria in fetch_detailed_wsb_posts.") if ctx: await ctx.report_progress(3, 3) return {"count": 0, "posts": {}} post_ids = [post["id"] for post in posts_result["posts"]] logger.info(f"Found {len(post_ids)} posts matching criteria, fetching details...") if ctx: await ctx.report_progress(1, 3) # Step 2: Fetch detailed information for the filtered posts # Pass the context down to fetch_batch_post_details for finer-grained progress within that step details_result = await fetch_batch_post_details(post_ids=post_ids, ctx=ctx) # Pass context here if "error" in details_result: logger.error(f"Error during batch detail fetch in fetch_detailed_wsb_posts: {details_result['error']}") # Progress reporting is handled within fetch_batch_post_details if ctx is passed return {"error": f"Failed during batch detail fetch: {details_result['error']}"} # Progress reporting completion is handled within fetch_batch_post_details logger.info(f"Successfully fetched details for {len(details_result.get('posts', {}))} posts.") return details_result # Return the structure from fetch_batch_post_details
- server.py:24-37 (schema)Pydantic BaseModel schemas for 'Comment' and 'Post' objects used in the detailed post data returned by the tool.class Comment(BaseModel): content: str score: int author: str class Post(BaseModel): url: str title: str selftext: str upvote_ratio: float link_flair_text: str top_comments: list[Comment] = Field(default_factory=list) extracted_links: list[str] = Field(default_factory=list)
- server.py:311-340 (helper)Helper tool 'fetch_batch_post_details' called by the main handler to fetch details for multiple posts.async def fetch_batch_post_details(post_ids: list[str], ctx: Context = None) -> dict: """ Fetch details for multiple posts efficiently. Args: post_ids: List of Reddit post IDs Returns: Dictionary with details for all requested posts """ if not post_ids: return {"error": "No post IDs provided"} results = {} total = len(post_ids) for i, post_id in enumerate(post_ids): if ctx: await ctx.report_progress(i, total) detail = await fetch_post_details(post_id) results[post_id] = detail if ctx: await ctx.report_progress(total, total) return { "count": len(results), "posts": results }
- server.py:213-308 (helper)Core helper 'fetch_post_details' that retrieves detailed post information, top comments, and external links for a single post ID. Called indirectly via fetch_batch_post_details.async def fetch_post_details(post_id: str, ctx: Context = None) -> dict: """ Fetch detailed information about a specific WSB post including top comments. Caches results for 5 minutes. Args: post_id: Reddit post ID Returns: Detailed post data including comments and extracted links """ # --- Cache Check --- cache_key = f"fetch_post_details:{post_id}" current_time = time.time() if cache_key in CACHE_DATA and current_time < CACHE_EXPIRY.get(cache_key, 0): logger.info(f"Cache hit for {cache_key}") return CACHE_DATA[cache_key] logger.info(f"Cache miss for {cache_key}") # --- End Cache Check --- try: if ctx: await ctx.report_progress(0, 3) reddit = await get_reddit_client() if not reddit: return {"error": "Unable to connect to Reddit API. Check your credentials."} try: if ctx: await ctx.report_progress(1, 3) submission = await reddit.submission(id=post_id) # Load comments if ctx: await ctx.report_progress(2, 3) await submission.comments.replace_more(limit=0) comments = await submission.comments.list() top_comments = sorted(comments, key=lambda c: c.score, reverse=True)[:10] # Extract links content_links = [] if not submission.is_self and is_valid_external_link(submission.url): content_links.append(submission.url) elif submission.is_self: content_links.extend(extract_valid_links(submission.selftext)) # Process comments comment_links = [] comment_data = [] for comment in top_comments: try: author_name = comment.author.name if comment.author else "[deleted]" links_in_comment = extract_valid_links(comment.body) if links_in_comment: comment_links.extend(links_in_comment) comment_data.append({ "content": comment.body, "score": comment.score, "author": author_name }) except Exception as e: logger.warning(f"Error processing comment: {str(e)}") # Combine all found links all_links = list(set(content_links + comment_links)) result = { "post_id": post_id, "url": f"https://www.reddit.com{submission.permalink}", "title": submission.title, "selftext": submission.selftext if submission.is_self else "", "upvote_ratio": submission.upvote_ratio, "score": submission.score, "link_flair_text": submission.link_flair_text or "", "top_comments": comment_data, "extracted_links": all_links } # --- Cache Store --- CACHE_DATA[cache_key] = result CACHE_EXPIRY[cache_key] = current_time + CACHE_TTL logger.info(f"Cached result for {cache_key} with TTL {CACHE_TTL}s") # --- End Cache Store --- if ctx: await ctx.report_progress(3, 3) return result finally: await reddit.close() except Exception as e: logger.error(f"Error in fetch_post_details: {str(e)}") return {"error": f"Failed to fetch post details: {str(e)}"}
- server.py:118-210 (helper)Helper tool 'find_top_posts' used by the main handler to initially filter and select top WSB posts based on score, comments, and flairs.@mcp.tool() async def find_top_posts(min_score: int = 100, min_comments: int = 10, limit: int = 10, excluded_flairs: list[str] = ["Meme", "Shitpost", "Gain", "Loss"], ctx: Context = None) -> dict: """ Fetch and filter WSB posts based on criteria. Caches results for 5 minutes. Args: min_score: Minimum score (upvotes) required min_comments: Minimum number of comments required limit: Maximum number of posts to return excluded_flairs: List of post flairs to exclude. Defaults to ["Meme", "Shitpost", "Gain", "Loss"]. Returns: A dictionary with filtered posts data """ # --- Cache Check --- # Sort flairs to ensure consistent key regardless of order excluded_flairs_tuple = tuple(sorted(excluded_flairs)) cache_key = f"find_top_posts:{min_score}:{min_comments}:{limit}:{excluded_flairs_tuple}" current_time = time.time() if cache_key in CACHE_DATA and current_time < CACHE_EXPIRY.get(cache_key, 0): logger.info(f"Cache hit for {cache_key}") return CACHE_DATA[cache_key] logger.info(f"Cache miss for {cache_key}") # --- End Cache Check --- try: if ctx: await ctx.report_progress(0, 2) reddit = await get_reddit_client() if not reddit: return {"error": "Unable to connect to Reddit API. Check your credentials."} try: # Fetch posts if ctx: await ctx.report_progress(1, 2) subreddit = await reddit.subreddit("wallstreetbets") hot_posts = subreddit.hot(limit=50) top_posts_heap = [] # Min-heap storing (score, post_dict) async for post in hot_posts: # Filter if post.score >= min_score and \ post.num_comments >= min_comments and \ (post.link_flair_text or "") not in excluded_flairs: post_data = { "id": post.id, "url": f"https://www.reddit.com{post.permalink}", "title": post.title, "selftext": post.selftext, "score": post.score, "num_comments": post.num_comments, "upvote_ratio": post.upvote_ratio, "link_flair_text": post.link_flair_text or "", "created_utc": post.created_utc } if len(top_posts_heap) < limit: heapq.heappush(top_posts_heap, (post.score, post_data)) elif post.score > top_posts_heap[0][0]: # Compare with min score in heap # If current post is better than the worst in the heap, replace it heapq.heapreplace(top_posts_heap, (post.score, post_data)) # Extract posts from heap and sort descending by score # The heap contains the top 'limit' posts based on score, but not necessarily sorted top_posts = sorted([item[1] for item in top_posts_heap], key=lambda x: x['score'], reverse=True) logger.info(f"Processed posts, selected top {len(top_posts)} posts meeting criteria") if ctx: await ctx.report_progress(2, 2) result = { "count": len(top_posts), "posts": top_posts } # --- Cache Store --- CACHE_DATA[cache_key] = result CACHE_EXPIRY[cache_key] = current_time + CACHE_TTL logger.info(f"Cached result for {cache_key} with TTL {CACHE_TTL}s") # --- End Cache Store --- return result finally: await reddit.close() except Exception as e: logger.error(f"Error in fetch_posts: {str(e)}") return {"error": f"Failed to fetch posts: {str(e)}"}