We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/king-of-the-grackles/reddit-mcp-poc'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
from typing import Optional, Dict, Any, Literal, List
import praw
from praw.models import Submission, Comment as PrawComment, MoreComments
from prawcore import (
NotFound,
Forbidden,
TooManyRequests,
ServerError,
ResponseException,
)
from fastmcp import Context
from ..models import SubmissionWithCommentsResult, RedditPost, Comment
def parse_comment_tree(
comment: PrawComment,
depth: int = 0,
max_depth: int = 10,
ctx: Context = None
) -> Comment:
"""
Recursively parse a comment and its replies into our Comment model.
Args:
comment: PRAW comment object
depth: Current depth in the comment tree
max_depth: Maximum depth to traverse
ctx: FastMCP context (optional)
Returns:
Parsed Comment object with nested replies
"""
# Phase 1: Accept context but don't use it yet
replies = []
if depth < max_depth and hasattr(comment, 'replies'):
for reply in comment.replies:
if isinstance(reply, PrawComment):
replies.append(parse_comment_tree(reply, depth + 1, max_depth, ctx))
# Skip MoreComments objects for simplicity in MVP
return Comment(
id=comment.id,
body=comment.body,
author=str(comment.author) if comment.author else "[deleted]",
score=comment.score,
created_utc=comment.created_utc,
depth=depth,
replies=replies
)
async def fetch_submission_with_comments(
reddit: praw.Reddit,
submission_id: Optional[str] = None,
url: Optional[str] = None,
comment_limit: int = 100,
comment_sort: Literal["best", "top", "new"] = "best",
ctx: Context = None
) -> Dict[str, Any]:
"""
Fetch a Reddit submission with its comment tree.
Args:
reddit: Configured Reddit client
submission_id: Reddit post ID
url: Full URL to the post (alternative to submission_id)
comment_limit: Maximum number of comments to fetch
comment_sort: How to sort comments
ctx: FastMCP context (auto-injected by decorator)
Returns:
Dictionary containing submission and comments
"""
# Phase 1: Accept context but don't use it yet
try:
# Validate that we have either submission_id or url
if not submission_id and not url:
return {"error": "Either submission_id or url must be provided"}
# Get submission
try:
if submission_id:
submission = reddit.submission(id=submission_id)
else:
submission = reddit.submission(url=url)
# Force fetch to check if submission exists
_ = submission.title
except NotFound as e:
return {
"error": "Submission not found",
"status_code": 404,
"recovery": "Verify the submission_id or url is correct"
}
except Forbidden as e:
return {
"error": "Access to submission forbidden",
"status_code": 403,
"detail": e.response.text[:200] if hasattr(e, 'response') else None,
"recovery": "Submission may be in a private or quarantined subreddit"
}
except TooManyRequests as e:
return {
"error": "Rate limited by Reddit API",
"status_code": 429,
"retry_after_seconds": e.retry_after if hasattr(e, 'retry_after') else None,
"recovery": "Wait before retrying"
}
except ServerError as e:
return {
"error": "Reddit server error",
"status_code": e.response.status_code if hasattr(e, 'response') else 500,
"recovery": "Reddit is experiencing issues - retry after a short delay"
}
except ResponseException as e:
return {
"error": f"Reddit API error: {str(e)}",
"status_code": e.response.status_code if hasattr(e, 'response') else None,
"response_body": e.response.text[:300] if hasattr(e, 'response') else None,
"recovery": "Check submission reference and retry"
}
except Exception as e:
return {
"error": f"Invalid submission reference: {str(e)}",
"error_type": type(e).__name__,
"recovery": "Provide either a valid submission_id or url"
}
# Set comment sort
submission.comment_sort = comment_sort
# Replace "More Comments" with actual comments (up to limit)
submission.comments.replace_more(limit=0) # Don't expand "more" comments in MVP
# Parse submission
submission_data = RedditPost(
id=submission.id,
title=submission.title,
selftext=submission.selftext if submission.selftext else "",
author=str(submission.author) if submission.author else "[deleted]",
subreddit=submission.subreddit.display_name,
score=submission.score,
upvote_ratio=submission.upvote_ratio,
num_comments=submission.num_comments,
created_utc=submission.created_utc,
url=submission.url
)
# Parse comments
comments = []
comment_count = 0
for top_level_comment in submission.comments:
# In tests, we might get regular Mock objects instead of PrawComment
# Check if it has the required attributes
if hasattr(top_level_comment, 'id') and hasattr(top_level_comment, 'body'):
if comment_count >= comment_limit:
break
# Report progress before processing comment
if ctx:
await ctx.report_progress(
progress=comment_count,
total=comment_limit,
message=f"Loading comments ({comment_count}/{comment_limit})"
)
if isinstance(top_level_comment, PrawComment):
comments.append(parse_comment_tree(top_level_comment, ctx=ctx))
else:
# Handle mock objects in tests
comments.append(Comment(
id=top_level_comment.id,
body=top_level_comment.body,
author=str(top_level_comment.author) if top_level_comment.author else "[deleted]",
score=top_level_comment.score,
created_utc=top_level_comment.created_utc,
depth=0,
replies=[]
))
# Count all comments including replies
comment_count += 1 + count_replies(comments[-1])
# Report final completion
if ctx:
await ctx.report_progress(
progress=comment_count,
total=comment_limit,
message=f"Completed: {comment_count} comments loaded"
)
result = SubmissionWithCommentsResult(
submission=submission_data,
comments=comments,
total_comments_fetched=comment_count
)
return result.model_dump()
except TooManyRequests as e:
return {
"error": "Rate limited by Reddit API",
"status_code": 429,
"retry_after_seconds": e.retry_after if hasattr(e, 'retry_after') else None,
"recovery": "Wait before retrying"
}
except ResponseException as e:
return {
"error": f"Reddit API error: {str(e)}",
"status_code": e.response.status_code if hasattr(e, 'response') else None,
"response_body": e.response.text[:300] if hasattr(e, 'response') else None,
"recovery": "Check parameters and retry"
}
except Exception as e:
return {
"error": f"Failed to fetch submission: {str(e)}",
"error_type": type(e).__name__,
"recovery": "Check parameters match schema from get_operation_schema"
}
def count_replies(comment: Comment) -> int:
"""Count total number of replies in a comment tree."""
count = len(comment.replies)
for reply in comment.replies:
count += count_replies(reply)
return count