Skip to main content
Glama
Arindam200

Reddit MCP Server

reply_to_post

Post a reply to an existing Reddit post by providing the post ID and content, enabling interaction with Reddit discussions.

Instructions

Post a reply to an existing Reddit post.

Args:
    post_id: The ID of the post to reply to (can be full URL, permalink, or just ID)
    content: The content of the reply (1-10000 characters)
    subreddit: The subreddit name if known (for validation, with or without 'r/' prefix)

Returns:
    Dictionary containing information about the created reply and parent post

Raises:
    ValueError: If input validation fails or post is not found
    RuntimeError: For other errors during reply creation

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
post_idYes
contentYes
subredditNo

Implementation Reference

  • The core handler function for the 'reply_to_post' MCP tool. It handles posting replies to Reddit posts, including validation, fetching the target submission, checking if the post allows replies, creating the reply via PRAW, and formatting the response with parent post details.
    @require_write_access
    def reply_to_post(
        post_id: str, content: str, subreddit: Optional[str] = None
    ) -> Dict[str, Any]:
        """Post a reply to an existing Reddit post.
    
        Args:
            post_id: The ID of the post to reply to (can be full URL, permalink, or just ID)
            content: The content of the reply (1-10000 characters)
            subreddit: The subreddit name if known (for validation, with or without 'r/' prefix)
    
        Returns:
            Dictionary containing information about the created reply and parent post
    
        Raises:
            ValueError: If input validation fails or post is not found
            RuntimeError: For other errors during reply creation
        """
        manager = RedditClientManager()
        if not manager.client:
            raise RuntimeError("Reddit client not initialized")
    
        # Input validation
        if not post_id or not isinstance(post_id, str):
            raise ValueError("Post ID is required")
        if not content or not isinstance(content, str):
            raise ValueError("Reply content is required")
        if len(content) < 1 or len(content) > 10000:
            raise ValueError("Reply must be between 1 and 10000 characters")
    
        # Clean up subreddit name if provided
        clean_subreddit = None
        if subreddit:
            if not isinstance(subreddit, str):
                raise ValueError("Subreddit name must be a string")
            clean_subreddit = subreddit[2:] if subreddit.startswith("r/") else subreddit
    
        try:
            # Clean up the post_id if it's a full URL or permalink
            clean_post_id = _extract_reddit_id(post_id)
            logger.info(f"Creating reply to post ID: {clean_post_id}")
    
            # Get the submission object
            submission = manager.client.submission(id=clean_post_id)
    
            # Verify the post exists by accessing its attributes
            try:
                post_title = submission.title
                post_author = getattr(submission, "author", None)
                post_subreddit = submission.subreddit
    
                logger.info(
                    f"Replying to post: "
                    f"Title: {post_title}, "
                    f"Author: {post_author}, "
                    f"Subreddit: r/{post_subreddit.display_name}"
                )
    
            except Exception as e:
                logger.exception(f"Failed to access post {clean_post_id}: {e}")
                raise ValueError(f"Post {clean_post_id} not found or inaccessible") from e
    
            # If subreddit was provided, verify we're in the right place
            if (
                clean_subreddit
                and post_subreddit.display_name.lower() != clean_subreddit.lower()
            ):
                raise ValueError(
                    f"Post ID {clean_post_id} belongs to r/{post_subreddit.display_name}, "
                    f"not r/{clean_subreddit}"
                )
    
            # Check if the post is archived or locked
            if getattr(submission, "archived", False):
                raise ValueError("Cannot reply to an archived post")
            if getattr(submission, "locked", False):
                raise ValueError("Cannot reply to a locked post")
    
            # Create the reply
            logger.info(f"Posting reply with content length: {len(content)} characters")
            try:
                reply = submission.reply(body=content)
                logger.info(f"Reply created successfully: {reply.id}")
    
                return {
                    "reply": _format_comment(reply),
                    "parent_post": _format_post(submission),
                    "metadata": {
                        "created_at": _format_timestamp(time.time()),
                        "reply_id": reply.id,
                        "parent_id": clean_post_id,
                        "subreddit": post_subreddit.display_name,
                    },
                }
    
            except Exception as reply_error:
                logger.exception(f"Failed to create reply: {reply_error}")
                if "RATELIMIT" in str(reply_error).upper():
                    raise RuntimeError(
                        "You're doing that too much. Please wait before replying again."
                    ) from reply_error
                if "TOO_OLD" in str(reply_error):
                    raise RuntimeError(
                        "This thread is archived and cannot be replied to"
                    ) from reply_error
                raise RuntimeError(f"Failed to post reply: {reply_error}") from reply_error
    
        except Exception as e:
            logger.exception(f"Error in reply_to_post for ID {post_id}: {e}")
            if isinstance(e, (ValueError, RuntimeError)):
                raise
            raise RuntimeError(f"Failed to create comment reply: {e}") from e
  • server.py:1280-1280 (registration)
    The @mcp.tool() decorator registers the reply_to_post function as an MCP tool in the FastMCP server.
    @require_write_access
  • Decorator applied to reply_to_post that enforces write access requirements by checking Reddit client authentication and read-only status.
    def require_write_access(func: F) -> F:
        """Decorator to ensure write access is available."""
    
        @functools.wraps(func)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            reddit_manager = RedditClientManager()
            if reddit_manager.is_read_only:
                raise ValueError(
                    "Write operation not allowed in read-only mode. Please provide valid credentials."
                )
            if not reddit_manager.check_user_auth():
                raise Exception(
                    "Authentication required for write operations. "
                    "Please provide valid REDDIT_USERNAME and REDDIT_PASSWORD environment variables."
                )
            return func(*args, **kwargs)
    
        return cast(F, wrapper)
  • Helper function used by reply_to_post to extract the base Reddit post ID from a URL, permalink, or raw ID.
    def _extract_reddit_id(reddit_id: str) -> str:
        """Extract the base ID from a Reddit URL or ID.
    
        Args:
            reddit_id: Either a Reddit ID or a URL containing the ID
    
        Returns:
            The extracted Reddit ID
        """
        if not reddit_id:
            raise ValueError("Empty ID provided")
    
        # If it's a URL, extract the ID part
        if "/" in reddit_id:
            # Handle both standard URLs and permalinks
            parts = [p for p in reddit_id.split("/") if p]
            # The ID is typically the last non-empty part
            reddit_id = parts[-1]
            logger.debug(f"Extracted ID {reddit_id} from URL")
    
        return reddit_id
  • Helper function used by reply_to_post to format the created reply comment for the response.
    def _format_comment(comment: praw.models.Comment) -> str:
        """Format comment information with AI-driven insights."""
        flags = []
        if comment.edited:
            flags.append("Edited")
        if hasattr(comment, "is_submitter") and comment.is_submitter:
            flags.append("OP")
    
        return f"""
            • Author: u/{str(comment.author)}
            • Content: {comment.body}
            • Stats:
            - Score: {comment.score:,}
            - Controversiality: {comment.controversiality if hasattr(comment, "controversiality") else "Unknown"}
            • Context:
            - Subreddit: r/{str(comment.subreddit)}
            - Thread: {comment.submission.title}
            • Metadata:
            - Posted: {_format_timestamp(comment.created_utc)}
            - Flags: {", ".join(flags) if flags else "None"}
            • Link: https://reddit.com{comment.permalink}
    
            💬 Comment Analysis:
            - {_analyze_comment_impact(comment.score, bool(comment.edited), hasattr(comment, "is_submitter"))}
            """

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arindam200/reddit-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server