Skip to main content
Glama
Arindam200

Reddit MCP Server

get_top_posts

Retrieve top-performing posts from any Reddit community by specifying subreddit, time period, and post count. This tool helps analyze trending content and community engagement patterns.

Instructions

Get top posts from a subreddit.

Args:
    subreddit: Name of the subreddit (with or without 'r/' prefix)
    time_filter: Time period to filter posts (e.g. "day", "week", "month", "year", "all")
    limit: Number of posts to fetch (1-100)
    include_comments: If True, load and return the full comment forest for each post
    comment_replace_more_limit: Limit for replacing "MoreComments" objects (0 for none, None for all)

Returns:
    Dictionary containing structured post information with the following structure:
    {
        'subreddit': str,  # Subreddit name
        'time_filter': str,  # The time period used for filtering
        'posts': [  # List of posts, each with the following structure:
            {
                'id': str,  # Post ID
                'title': str,  # Post title
                'author': str,  # Author's username
                'score': int,  # Post score (upvotes - downvotes)
                'upvote_ratio': float,  # Ratio of upvotes to total votes
                'num_comments': int,  # Number of comments
                'created_utc': float,  # Post creation timestamp
                'url': str,  # URL to the post
                'permalink': str,  # Relative URL to the post
                'is_self': bool,  # Whether it's a self (text) post
                'selftext': str,  # Content of self post (if any)
                'link_url': str,  # URL for link posts (if any)
                'over_18': bool,  # Whether marked as NSFW
                'spoiler': bool,  # Whether marked as spoiler
                'stickied': bool,  # Whether stickied in the subreddit
                'locked': bool,  # Whether comments are locked
                'distinguished': Optional[str],  # Distinguishing type (e.g., 'moderator')
                'flair': Optional[Dict],  # Post flair information if any
                'comments': Optional[List[Dict]],  # present if include_comments is True
            },
            ...
        ],
        'metadata': {
            'fetched_at': float,  # Timestamp when data was fetched
            'post_count': int,  # Number of posts returned
        }
    }

Raises:
    ValueError: If subreddit is invalid or time_filter is not valid
    RuntimeError: For other errors during the operation

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
subredditYes
time_filterNoweek
limitNo
include_commentsNo
comment_replace_more_limitNo

Implementation Reference

  • Primary handler function implementing the 'get_top_posts' MCP tool. Fetches top posts from specified subreddit using PRAW Reddit API, formats into structured dict with optional full comment trees, includes validation and error handling.
    @mcp.tool()
    def get_top_posts(
        subreddit: str,
        time_filter: str = "week",
        limit: int = 10,
        include_comments: bool = False,
        comment_replace_more_limit: int = 0,
    ) -> Dict[str, Any]:
        """Get top posts from a subreddit.
    
        Args:
            subreddit: Name of the subreddit (with or without 'r/' prefix)
            time_filter: Time period to filter posts (e.g. "day", "week", "month", "year", "all")
            limit: Number of posts to fetch (1-100)
            include_comments: If True, load and return the full comment forest for each post
            comment_replace_more_limit: Limit for replacing "MoreComments" objects (0 for none, None for all)
    
        Returns:
            Dictionary containing structured post information with the following structure:
            {
                'subreddit': str,  # Subreddit name
                'time_filter': str,  # The time period used for filtering
                'posts': [  # List of posts, each with the following structure:
                    {
                        'id': str,  # Post ID
                        'title': str,  # Post title
                        'author': str,  # Author's username
                        'score': int,  # Post score (upvotes - downvotes)
                        'upvote_ratio': float,  # Ratio of upvotes to total votes
                        'num_comments': int,  # Number of comments
                        'created_utc': float,  # Post creation timestamp
                        'url': str,  # URL to the post
                        'permalink': str,  # Relative URL to the post
                        'is_self': bool,  # Whether it's a self (text) post
                        'selftext': str,  # Content of self post (if any)
                        'link_url': str,  # URL for link posts (if any)
                        'over_18': bool,  # Whether marked as NSFW
                        'spoiler': bool,  # Whether marked as spoiler
                        'stickied': bool,  # Whether stickied in the subreddit
                        'locked': bool,  # Whether comments are locked
                        'distinguished': Optional[str],  # Distinguishing type (e.g., 'moderator')
                        'flair': Optional[Dict],  # Post flair information if any
                        'comments': Optional[List[Dict]],  # present if include_comments is True
                    },
                    ...
                ],
                'metadata': {
                    'fetched_at': float,  # Timestamp when data was fetched
                    'post_count': int,  # Number of posts returned
                }
            }
    
        Raises:
            ValueError: If subreddit is invalid or time_filter is not valid
            RuntimeError: For other errors during the operation
        """
        manager = RedditClientManager()
        if not manager.client:
            raise RuntimeError("Reddit client not initialized")
    
        if not subreddit or not isinstance(subreddit, str):
            raise ValueError("Subreddit name is required")
    
        valid_time_filters = ["hour", "day", "week", "month", "year", "all"]
        if time_filter not in valid_time_filters:
            raise ValueError(
                f"Invalid time filter. Must be one of: {', '.join(valid_time_filters)}"
            )
    
        limit = max(1, min(100, limit))  # Ensure limit is between 1 and 100
    
        # Clean up subreddit name (remove r/ prefix if present)
        clean_subreddit = subreddit[2:] if subreddit.startswith("r/") else subreddit
    
        try:
            logger.info(
                f"Getting top {limit} posts from r/{clean_subreddit} "
                f"(time_filter={time_filter}, include_comments={include_comments})"
            )
    
            # Get the subreddit
            sub = manager.client.subreddit(clean_subreddit)
    
            # Verify subreddit exists and is accessible
            _ = sub.display_name
    
            # Fetch posts
            posts = list(sub.top(time_filter=time_filter, limit=limit))
    
            if not posts:
                return {
                    "subreddit": clean_subreddit,
                    "time_filter": time_filter,
                    "posts": [],
                    "metadata": {"fetched_at": time.time(), "post_count": 0},
                }
    
            # Format posts into structured data
            formatted_posts = []
            for post in posts:
                try:
                    # Get post data with error handling for each field
                    post_data: Dict[str, Any] = {
                        "id": post.id,
                        "title": post.title,
                        "author": str(post.author) if getattr(post, "author", None) else "[deleted]",
                        "score": getattr(post, "score", 0),
                        "upvote_ratio": getattr(post, "upvote_ratio", 0.0),
                        "num_comments": getattr(post, "num_comments", 0),
                        "created_utc": post.created_utc,
                        "url": f"https://www.reddit.com{post.permalink}" if hasattr(post, "permalink") else "",
                        "permalink": getattr(post, "permalink", ""),
                        "is_self": getattr(post, "is_self", False),
                        "selftext": getattr(post, "selftext", ""),
                        "link_url": getattr(post, "url", ""),
                        "over_18": getattr(post, "over_18", False),
                        "spoiler": getattr(post, "spoiler", False),
                        "stickied": getattr(post, "stickied", False),
                        "locked": getattr(post, "locked", False),
                        "distinguished": getattr(post, "distinguished", None),
                    }
    
                    # Add flair information if available
                    if hasattr(post, "link_flair_text") and post.link_flair_text:
                        post_data["flair"] = {
                            "text": post.link_flair_text,
                            "css_class": getattr(post, "link_flair_css_class", ""),
                            "template_id": getattr(post, "link_flair_template_id", None),
                            "text_color": getattr(post, "link_flair_text_color", None),
                            "background_color": getattr(
                                post, "link_flair_background_color", None
                            ),
                        }
                    else:
                        post_data["flair"] = None
    
                    # Add comments if requested
                    if include_comments:
                        try:
                            # Resolve all MoreComments to get the complete tree
                            # limit=0 removes no MoreComments, limit=None removes all (slow!)
                            post.comments.replace_more(limit=comment_replace_more_limit)
    
                            top_level_comments = [
                                c
                                for c in post.comments
                                if isinstance(c, praw.models.Comment)
                            ]
    
                            post_data["comments"] = [
                                _serialize_comment_tree(c) for c in top_level_comments
                            ]
                        except Exception as comments_error:
                            logger.exception(
                                f"Error loading comments for post {getattr(post, 'id', 'unknown')}"
                            )
                            post_data["comments"] = []
    
                    formatted_posts.append(post_data)
    
                except Exception as post_error:
                    logger.error(
                        f"Error processing post {getattr(post, 'id', 'unknown')}: {post_error}"
                    )
                    continue
    
            return {
                "subreddit": clean_subreddit,
                "time_filter": time_filter,
                "posts": formatted_posts,
                "metadata": {
                    "fetched_at": time.time(),
                    "post_count": len(formatted_posts),
                },
            }
    
        except Exception as e:
            logger.error(f"Error getting top posts from r/{clean_subreddit}: {e}")
            if "private" in str(e).lower():
                raise ValueError(
                    f"r/{clean_subreddit} is private or cannot be accessed"
                ) from e
            if "banned" in str(e).lower():
                raise ValueError(
                    f"r/{clean_subreddit} has been banned or doesn't exist"
                ) from e
            if "not found" in str(e).lower():
                raise ValueError(f"r/{clean_subreddit} not found") from e
            raise RuntimeError(f"Failed to get top posts: {e}") from e
  • Recursive helper to serialize PRAW comment trees into flat JSON-compatible dicts, used by get_top_posts when include_comments=True to include full comment forests.
    def _serialize_comment_tree(comment: praw.models.Comment) -> Dict[str, Any]:
        """Serialize a PRAW comment into a JSON-serializable tree structure."""
        try:
            replies = []
            if getattr(comment, "replies", None):
                replies = [
                    _serialize_comment_tree(reply)
                    for reply in comment.replies
                    if isinstance(reply, praw.models.Comment)
                ]
        except Exception as e:
            logger.error(f"Error while serializing replies for comment {getattr(comment, 'id', 'unknown')}: {e}")
            replies = []
    
        return {
            "id": comment.id,
            "author": str(comment.author) if comment.author else "[deleted]",
            "body": getattr(comment, "body", ""),
            "score": getattr(comment, "score", 0),
            "created_utc": getattr(comment, "created_utc", 0.0),
            "permalink": getattr(comment, "permalink", ""),
            "is_submitter": getattr(comment, "is_submitter", False),
            "distinguished": getattr(comment, "distinguished", None),
            "stickied": getattr(comment, "stickied", False),
            "locked": getattr(comment, "locked", False),
            "replies": replies,
        }
  • Singleton manager for PRAW Reddit client instance, used by all tools including get_top_posts for API access with fallback to read-only mode.
    class RedditClientManager:
        """Manages the Reddit client and its state."""
    
        _instance = None
        _client = None
        _is_read_only = True
    
        def __new__(cls) -> "RedditClientManager":
            if cls._instance is None:
                cls._instance = super(RedditClientManager, cls).__new__(cls)
                cls._instance._initialize_client()
            return cls._instance
    
        def _initialize_client(self) -> None:
            """Initialize the Reddit client with appropriate credentials."""
            client_id = getenv("REDDIT_CLIENT_ID")
            client_secret = getenv("REDDIT_CLIENT_SECRET")
            user_agent = getenv("REDDIT_USER_AGENT", "RedditMCPServer v1.0")
            username = getenv("REDDIT_USERNAME")
            password = getenv("REDDIT_PASSWORD")
    
            self._is_read_only = True
    
            try:
                # Try authenticated access first if credentials are provided
                if all([username, password, client_id, client_secret]):
                    logger.info(
                        f"Attempting to initialize Reddit client with user authentication for u/{username}"
                    )
                    try:
                        self._client = praw.Reddit(
                            client_id=client_id,
                            client_secret=client_secret,
                            user_agent=user_agent,
                            username=username,
                            password=password,
                            check_for_updates=False,
                        )
                        # Test authentication
                        if self._client.user.me() is None:
                            raise ValueError(f"Failed to authenticate as u/{username}")
    
                        logger.info(f"Successfully authenticated as u/{username}")
                        self._is_read_only = False
                        return
                    except Exception as auth_error:
                        logger.warning(f"Authentication failed: {auth_error}")
                        logger.info("Falling back to read-only access")
    
                # Fall back to read-only with client credentials
                if client_id and client_secret:
                    logger.info("Initializing Reddit client with read-only access")
                    self._client = praw.Reddit(
                        client_id=client_id,
                        client_secret=client_secret,
                        user_agent=user_agent,
                        check_for_updates=False,
                        read_only=True,
                    )
                    return
    
                # Last resort: read-only without credentials
                logger.info(
                    "Initializing Reddit client in read-only mode without credentials"
                )
                self._client = praw.Reddit(
                    user_agent=user_agent,
                    check_for_updates=False,
                    read_only=True,
                )
                # Test read-only access
                self._client.subreddit("popular").hot(limit=1)
    
            except Exception as e:
                logger.error(f"Error initializing Reddit client: {e}")
                self._client = None
    
        @property
        def client(self) -> Optional[praw.Reddit]:
            """Get the Reddit client instance."""
            return self._client
    
        @property
        def is_read_only(self) -> bool:
            """Check if the client is in read-only mode."""
            return self._is_read_only
    
        def check_user_auth(self) -> bool:
            """Check if user authentication is available for write operations."""
            if not self._client:
                logger.error("Reddit client not initialized")
                return False
            if self._is_read_only:
                logger.error("Reddit client is in read-only mode")
                return False
            return True
  • server.py:707-707 (registration)
    MCP tool registration decorator for the get_top_posts function, handled by FastMCP instance at line 137.
    @mcp.tool()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arindam200/reddit-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server