scrape_subreddit
Retrieve posts, comments, and media from any subreddit. Get titles, authors, scores, and more without API keys.
Instructions
Scrape posts from a subreddit. Returns post data including titles, authors, scores, comments, and media URLs.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| subreddit | Yes | Name of the subreddit to scrape (without r/) | |
| limit | No | Maximum number of posts to scrape (default: 100) | |
| download_media | No | Whether to download images and videos (default: false) | |
| scrape_comments | No | Whether to scrape comments (default: true) |
Implementation Reference
- src/mcp_reddit/server.py:365-408 (handler)The main handler function for the scrape_subreddit tool. Calls run_scraper via run_in_executor, then reads the resulting CSV to return post data.
async def scrape_subreddit( subreddit: str, limit: int, download_media: bool, scrape_comments: bool ) -> dict: """Scrape a subreddit.""" try: loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, run_scraper, subreddit, limit, False, # is_user download_media, scrape_comments, DATA_DIR, ) prefix = "r" base_dir = f"{DATA_DIR}/{prefix}_{subreddit}" posts_file = f"{base_dir}/posts.csv" if os.path.exists(posts_file): df = pd.read_csv(posts_file) recent_posts = df.tail(min(limit, len(df))).to_dict("records") return { "success": True, "subreddit": subreddit, "posts_scraped": result.get("posts", 0), "comments_scraped": result.get("comments", 0), "duration_seconds": result.get("duration", 0), "recent_posts": recent_posts[:10], "total_posts_in_db": len(df), "data_location": base_dir, } return { "success": True, "message": "Scrape completed but no data file found", "result": result, } except Exception as e: return {"success": False, "error": str(e)} - src/mcp_reddit/server.py:35-61 (schema)The tool schema/input validation definition for scrape_subreddit, registered via the Tool object with name, description, and inputSchema.
name="scrape_subreddit", description="Scrape posts from a subreddit. Returns post data including titles, authors, scores, comments, and media URLs.", inputSchema={ "type": "object", "properties": { "subreddit": { "type": "string", "description": "Name of the subreddit to scrape (without r/)", }, "limit": { "type": "integer", "description": "Maximum number of posts to scrape (default: 100)", "default": 100, }, "download_media": { "type": "boolean", "description": "Whether to download images and videos (default: false)", "default": False, }, "scrape_comments": { "type": "boolean", "description": "Whether to scrape comments (default: true)", "default": True, }, }, "required": ["subreddit"], }, - src/mcp_reddit/server.py:284-295 (registration)The call_tool handler that routes the 'scrape_subreddit' tool name to the scrape_subreddit async function.
@app.call_tool() async def call_tool(name: str, arguments: Any) -> list[TextContent]: """Handle tool execution.""" try: if name == "scrape_subreddit": result = await scrape_subreddit( arguments["subreddit"], arguments.get("limit", 100), arguments.get("download_media", False), arguments.get("scrape_comments", True), ) return [TextContent(type="text", text=json.dumps(result, indent=2))] - src/mcp_reddit/scraper.py:338-514 (helper)The core async scraping logic (scrape_async) and its sync wrapper (run_scraper) that does the actual HTTP fetching and CSV persistence.
async def scrape_async( target, limit=100, is_user=False, download_media=True, scrape_comments=True, data_dir="data", ): """ Main async scraping function. Args: target: Subreddit or username limit: Max posts to scrape is_user: True if scraping a user download_media: Download images/videos scrape_comments: Scrape comments data_dir: Directory to store data """ global semaphore semaphore = asyncio.Semaphore(MAX_CONCURRENT) prefix = "u" if is_user else "r" # Setup directories base_dir = f"{data_dir}/{prefix}_{target}" media_dir = f"{base_dir}/media" images_dir = f"{media_dir}/images" videos_dir = f"{media_dir}/videos" for d in [base_dir, media_dir, images_dir, videos_dir]: os.makedirs(d, exist_ok=True) start_time = time.time() all_posts = [] all_comments = [] media_tasks = [] seen_permalinks = set() # Load existing data posts_file = f"{base_dir}/posts.csv" if os.path.exists(posts_file): try: df = pd.read_csv(posts_file) seen_permalinks = set(df["permalink"].astype(str).tolist()) except Exception: pass async with aiohttp.ClientSession(headers={"User-Agent": USER_AGENT}) as session: after = None total_fetched = 0 while total_fetched < limit: mirrors = MIRRORS.copy() random.shuffle(mirrors) data = None for mirror in mirrors: batch_size = min(100, limit - total_fetched) data = await fetch_posts_page( session, mirror, target, after, is_user, batch_size ) if data: break if not data: break children = data.get("data", {}).get("children", []) if not children: break batch_posts = [] comment_tasks = [] for child in children: p = child["data"] post = extract_post_data(p) if post["permalink"] in seen_permalinks: continue seen_permalinks.add(post["permalink"]) batch_posts.append(post) if download_media: media = extract_media_urls(p) for i, img_url in enumerate(media["images"][:5]): ext = os.path.splitext(urlparse(img_url).path)[1] or ".jpg" save_path = f"{images_dir}/{post['id']}_{i}{ext}" media_tasks.append( download_media_async(session, img_url, save_path) ) for i, img_url in enumerate(media["galleries"][:10]): save_path = f"{images_dir}/{post['id']}_gallery_{i}.jpg" media_tasks.append( download_media_async(session, img_url, save_path) ) for i, vid_url in enumerate(media["videos"][:2]): if "youtube" not in vid_url: save_path = f"{videos_dir}/{post['id']}_{i}.mp4" if "v.redd.it" in vid_url or "reddit.com" in vid_url: media_tasks.append( download_reddit_video_with_audio_async( session, vid_url, save_path ) ) else: media_tasks.append( download_media_async(session, vid_url, save_path) ) if scrape_comments and post["num_comments"] > 0: comment_tasks.append( fetch_comments_async(session, post["permalink"]) ) all_posts.extend(batch_posts) total_fetched += len(batch_posts) if comment_tasks: comment_results = await asyncio.gather( *comment_tasks, return_exceptions=True ) for result in comment_results: if isinstance(result, list): all_comments.extend(result) after = data.get("data", {}).get("after") if not after: break await asyncio.sleep(1) if media_tasks: await asyncio.gather(*media_tasks, return_exceptions=True) # Save data if all_posts: df = pd.DataFrame(all_posts) if os.path.exists(posts_file): df.to_csv(posts_file, mode="a", header=False, index=False) else: df.to_csv(posts_file, index=False) if all_comments: comments_file = f"{base_dir}/comments.csv" df = pd.DataFrame(all_comments) if os.path.exists(comments_file): df.to_csv(comments_file, mode="a", header=False, index=False) else: df.to_csv(comments_file, index=False) duration = time.time() - start_time return { "posts": len(all_posts), "comments": len(all_comments), "duration": duration, } def run_scraper( target, limit=100, is_user=False, download_media=True, scrape_comments=True, data_dir="data", ): """Sync wrapper to run async scraper.""" return asyncio.run( scrape_async(target, limit, is_user, download_media, scrape_comments, data_dir) )