Skip to main content
Glama
system-architecture.md91.3 kB
# System Architecture - Reddit MCP Server **Version:** 1.0 **Last Updated:** 2025-11-05 **Target:** Apify $1M Challenge - 5,000 MAU, Quality Score 65+ --- ## Table of Contents 1. [High-Level Architecture](#1-high-level-architecture) 2. [Component Breakdown](#2-component-breakdown) 3. [Data Models](#3-data-models) 4. [Component Interaction Flows](#4-component-interaction-flows) 5. [Technology Stack Justification](#5-technology-stack-justification) 6. [Performance & Scalability](#6-performance--scalability) 7. [Security Architecture](#7-security-architecture) 8. [Deployment Architecture](#8-deployment-architecture) --- ## 1. High-Level Architecture ### 1.1 System Overview (ASCII Diagram) ``` ┌─────────────────────────────────────────────────────────────────────┐ │ MCP CLIENT LAYER │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Claude │ │ ChatGPT │ │ Cursor │ │ Other │ │ │ │ Desktop │ │ App │ │ IDE │ │ Clients │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ │ └─────────────┴──────────────┴──────────────┘ │ │ │ │ │ JSON-RPC 2.0 │ │ (stdio/HTTP) │ └───────────────────────────┬──────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ MCP SERVER LAYER (FastMCP) │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ Request Router │ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ │ │ Tool 1 │ │ Tool 2 │ │ Tool 3 │ │ Tool 8 │ ... │ │ │ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │ │ └────────────────────────┬───────────────────────────────────┘ │ │ │ │ │ ┌────────────────────────▼───────────────────────────────────┐ │ │ │ Middleware Stack │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ Input Validation (Pydantic) │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ Authentication & Authorization │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ Rate Limiting (User-level) │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ Error Handler & Response Formatter │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └────────────────────────┬───────────────────────────────────┘ │ └───────────────────────────┼──────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ REDDIT API INTEGRATION LAYER │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ Cache Manager (Redis) │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ Check Cache │→ │ Hit: Return │ │ Miss: Fetch │ │ │ │ │ └─────────────┘ └─────────────┘ └──────┬──────┘ │ │ │ │ │ │ │ │ │ ┌──────────────────────────────────────────▼──────────┐ │ │ │ │ │ Cache Key Generator: reddit:{tool}:{hash}:{v} │ │ │ │ │ └──────────────────────────────────────────────────────┘ │ │ │ └────────────────────────┬───────────────────────────────────┘ │ │ │ Cache Miss │ │ ▼ │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ Reddit API Client (PRAW) │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ OAuth2 Token Manager │ │ │ │ │ │ - Token refresh (expires 1h) │ │ │ │ │ │ - Credential rotation │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ Rate Limit Manager (Token Bucket) │ │ │ │ │ │ - 100 QPM (free tier) │ │ │ │ │ │ - Priority queue │ │ │ │ │ │ - Exponential backoff │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ Request Queue & Deduplication │ │ │ │ │ │ - Coalesce identical requests │ │ │ │ │ │ - Priority: real-time > historical │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ Response Normalizer │ │ │ │ │ │ - Standard output format │ │ │ │ │ │ - Error mapping │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └────────────────────────┬───────────────────────────────────┘ │ └───────────────────────────┼──────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ EXTERNAL SERVICES │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ Reddit API │ │ Redis Cloud │ │ VADER NLP │ │ │ │ (100 QPM) │ │ (Caching) │ │ (Sentiment) │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ └─────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ APIFY ACTOR INFRASTRUCTURE │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ Standby Mode (Always-on HTTP endpoint) │ │ │ │ - Endpoint: https://{username}--{actor}.apify.actor/mcp │ │ │ │ - SSE Transport for streaming │ │ │ │ - Pay-per-event billing │ │ │ └────────────────────────────────────────────────────────────┘ │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ Monitoring & Analytics │ │ │ │ - Usage tracking (Apify Datasets) │ │ │ │ - Performance metrics │ │ │ │ - Error logs │ │ │ └────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────┘ ``` ### 1.2 Data Flow Summary 1. **Client Request** → MCP client sends JSON-RPC request via stdio or HTTP/SSE 2. **Server Routing** → FastMCP routes to appropriate tool based on method name 3. **Middleware Processing** → Validation, auth, rate limiting, logging 4. **Cache Check** → Redis lookup using hashed cache key 5. **Cache Hit** → Return cached response with metadata 6. **Cache Miss** → Forward to Reddit API Integration Layer 7. **API Request** → PRAW makes OAuth2-authenticated request to Reddit API 8. **Response Processing** → Normalize, cache, return to client 9. **Monitoring** → Log metrics to Apify datasets for analytics --- ## 2. Component Breakdown ### 2.1 MCP Server Layer (FastMCP) #### A. Server Initialization ```python from mcp.server.fastmcp import FastMCP from mcp.server.sse import SseServerTransport import os # Initialize MCP server with metadata mcp = FastMCP( name="reddit-mcp-server", version="1.0.0", description="Enterprise Reddit MCP Server with caching and sentiment analysis" ) # Configure capabilities mcp.configure( capabilities={ "tools": {}, "resources": {"subscribe": False}, # Future: real-time updates "prompts": {} } ) ``` #### B. Tool Registration & Routing Each tool is registered as a decorated function with input validation: ```python from pydantic import BaseModel, Field, validator from typing import Optional, Literal class SearchRedditInput(BaseModel): query: str = Field(..., min_length=1, max_length=500, description="Search query") subreddit: Optional[str] = Field(None, pattern="^[A-Za-z0-9_]+$") time_filter: Literal["hour", "day", "week", "month", "year", "all"] = "week" sort: Literal["relevance", "hot", "top", "new", "comments"] = "relevance" limit: int = Field(25, ge=1, le=100) @validator('query') def sanitize_query(cls, v): return v.strip() @mcp.tool() async def search_reddit(params: SearchRedditInput) -> dict: """Search Reddit for posts matching query""" # Implementation in section 2.4 pass ``` **Routing Logic:** - FastMCP automatically maps JSON-RPC method names to Python functions - Pattern: `tools/call` with `name: "search_reddit"` → `search_reddit()` function - No manual routing code needed (framework handles it) #### C. Request/Response Handling **Request Flow:** 1. Receive JSON-RPC request 2. Validate method exists 3. Parse parameters against Pydantic model 4. Execute tool function 5. Serialize response 6. Return JSON-RPC response **Response Format:** ```python class ToolResponse(BaseModel): results: List[dict] metadata: ResponseMetadata class ResponseMetadata(BaseModel): cached: bool cache_age_seconds: Optional[int] rate_limit_remaining: int execution_time_ms: float reddit_api_calls: int ``` #### D. Error Handling Architecture **Error Hierarchy:** ```python from mcp.types import MCPError class RedditMCPError(MCPError): """Base error for Reddit MCP""" pass class ValidationError(RedditMCPError): """Input validation failed""" code = -32602 # Invalid params class RateLimitError(RedditMCPError): """Rate limit exceeded""" code = -32000 # Server error class RedditAPIError(RedditMCPError): """Reddit API returned error""" code = -32001 class CacheError(RedditMCPError): """Redis cache unavailable""" code = -32002 ``` **Error Handler Middleware:** ```python import traceback import logging async def error_handler_middleware(func): async def wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except ValidationError as e: logger.warning(f"Validation error: {e}") raise except RateLimitError as e: logger.warning(f"Rate limit hit: {e}") raise MCPError( code=-32000, message="Rate limit exceeded", data={ "retry_after_seconds": e.retry_after, "rate_limit_tier": "free" } ) except RedditAPIError as e: logger.error(f"Reddit API error: {e}") raise MCPError( code=-32001, message="Reddit API unavailable", data={"status_code": e.status_code} ) except Exception as e: logger.error(f"Unexpected error: {traceback.format_exc()}") raise MCPError( code=-32603, message="Internal server error", data={"error_type": type(e).__name__} ) return wrapper ``` #### E. Logging and Monitoring Hooks ```python import structlog from datetime import datetime logger = structlog.get_logger() class MonitoringMiddleware: async def log_request(self, tool_name: str, params: dict, user_id: str): logger.info( "tool_request", tool=tool_name, user_id=user_id, timestamp=datetime.utcnow().isoformat(), params_hash=hash_params(params) ) async def log_response(self, tool_name: str, duration_ms: float, cached: bool, error: Optional[str]): logger.info( "tool_response", tool=tool_name, duration_ms=duration_ms, cached=cached, error=error, timestamp=datetime.utcnow().isoformat() ) # Send to Apify Dataset for analytics await Actor.push_data({ "event": "tool_execution", "tool": tool_name, "duration_ms": duration_ms, "cached": cached, "error": error, "timestamp": datetime.utcnow().isoformat() }) ``` --- ### 2.2 Reddit API Integration Layer #### A. PRAW Client Configuration ```python import praw from typing import Optional class RedditClientManager: def __init__(self): self.client: Optional[praw.Reddit] = None self._initialize_client() def _initialize_client(self): """Initialize PRAW with OAuth2 credentials""" self.client = praw.Reddit( client_id=os.getenv("REDDIT_CLIENT_ID"), client_secret=os.getenv("REDDIT_CLIENT_SECRET"), user_agent=os.getenv( "REDDIT_USER_AGENT", "Reddit-MCP-Server/1.0 by /u/apify-mcp" ), # Read-only mode (no user auth needed for most operations) username=None, password=None ) # Configure PRAW settings self.client.read_only = True self.client.config.timeout = 30 # seconds # Validate credentials try: self.client.user.me() # Test auth except Exception as e: logger.error(f"Reddit auth failed: {e}") raise RedditAPIError("Authentication failed") def get_client(self) -> praw.Reddit: """Get Reddit client instance""" if not self.client: self._initialize_client() return self.client # Singleton instance reddit_client = RedditClientManager() ``` #### B. OAuth2 Authentication Flow **Token Lifecycle:** - Reddit OAuth2 tokens expire after 1 hour - PRAW automatically refreshes tokens - Fallback: Manual refresh if PRAW fails ```python import time from threading import Lock class OAuth2TokenManager: def __init__(self): self.access_token: Optional[str] = None self.refresh_token: Optional[str] = None self.expires_at: Optional[float] = None self.lock = Lock() def get_valid_token(self) -> str: """Get valid access token, refresh if needed""" with self.lock: if self._is_token_expired(): self._refresh_token() return self.access_token def _is_token_expired(self) -> bool: """Check if token is expired or expiring soon (5 min buffer)""" if not self.expires_at: return True return time.time() >= (self.expires_at - 300) def _refresh_token(self): """Refresh OAuth2 token""" try: # PRAW handles this automatically, but manual fallback: response = requests.post( "https://www.reddit.com/api/v1/access_token", auth=(REDDIT_CLIENT_ID, REDDIT_CLIENT_SECRET), data={"grant_type": "client_credentials"}, headers={"User-Agent": REDDIT_USER_AGENT} ) data = response.json() self.access_token = data["access_token"] self.expires_at = time.time() + data["expires_in"] except Exception as e: logger.error(f"Token refresh failed: {e}") raise RedditAPIError("Token refresh failed") ``` #### C. Rate Limit Manager (Token Bucket Algorithm) ```python from collections import deque from datetime import datetime, timedelta import asyncio class TokenBucketRateLimiter: def __init__(self, max_calls: int = 100, period_seconds: int = 60): """ Token bucket algorithm for rate limiting Args: max_calls: Maximum calls per period (100 for Reddit free tier) period_seconds: Time period in seconds (60 for per-minute) """ self.max_calls = max_calls self.period = timedelta(seconds=period_seconds) self.calls = deque() # Store timestamps of API calls self.lock = asyncio.Lock() async def acquire(self, priority: int = 0) -> bool: """ Acquire permission to make API call Args: priority: 0 = normal, 1 = high (real-time), -1 = low (background) Returns: True when permission granted """ async with self.lock: # Remove old calls outside the time window now = datetime.utcnow() while self.calls and now - self.calls[0] > self.period: self.calls.popleft() # Check if we have capacity if len(self.calls) < self.max_calls: self.calls.append(now) return True # Wait until oldest call expires wait_time = (self.calls[0] + self.period - now).total_seconds() logger.warning( f"Rate limit reached, waiting {wait_time:.2f}s", calls_made=len(self.calls), max_calls=self.max_calls ) await asyncio.sleep(wait_time + 0.1) # Small buffer # Retry after waiting return await self.acquire(priority) def get_remaining(self) -> int: """Get remaining calls in current window""" now = datetime.utcnow() valid_calls = [c for c in self.calls if now - c <= self.period] return self.max_calls - len(valid_calls) # Global rate limiter instance rate_limiter = TokenBucketRateLimiter(max_calls=100, period_seconds=60) ``` #### D. Request Queue System ```python from asyncio import PriorityQueue import hashlib import json class RequestQueue: def __init__(self): self.queue = PriorityQueue() self.in_flight = {} # Track duplicate requests self.lock = asyncio.Lock() def _hash_request(self, endpoint: str, params: dict) -> str: """Generate hash for request deduplication""" request_str = f"{endpoint}:{json.dumps(params, sort_keys=True)}" return hashlib.sha256(request_str.encode()).hexdigest() async def enqueue(self, endpoint: str, params: dict, priority: int = 0) -> str: """ Add request to queue Args: endpoint: Reddit API endpoint params: Request parameters priority: 0 = normal, 1 = high, -1 = low Returns: Request hash for tracking """ request_hash = self._hash_request(endpoint, params) async with self.lock: # Check if identical request is already in flight if request_hash in self.in_flight: logger.debug( "Duplicate request detected, coalescing", request_hash=request_hash ) return request_hash # Add to queue (negative priority for ascending order) await self.queue.put((-priority, request_hash, endpoint, params)) self.in_flight[request_hash] = { "status": "queued", "waiters": [] } return request_hash async def dequeue(self) -> tuple: """Get next request from queue""" priority, request_hash, endpoint, params = await self.queue.get() return request_hash, endpoint, params async def complete(self, request_hash: str, result: any): """Mark request as completed and notify waiters""" async with self.lock: if request_hash in self.in_flight: self.in_flight[request_hash]["status"] = "completed" self.in_flight[request_hash]["result"] = result # Notify all waiting for this request for waiter in self.in_flight[request_hash]["waiters"]: waiter.set_result(result) # Clean up after 60 seconds asyncio.create_task(self._cleanup_request(request_hash, 60)) request_queue = RequestQueue() ``` #### E. Retry Logic with Exponential Backoff ```python import random from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type ) class RetryableRedditRequest: @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), retry=retry_if_exception_type(( praw.exceptions.ServerError, praw.exceptions.RequestException )), reraise=True ) async def execute(self, func, *args, **kwargs): """ Execute Reddit API request with exponential backoff Retry on: - 500/502/503 (Reddit server errors) - Network timeouts - Temporary connection failures Do NOT retry on: - 401 (auth errors) - 403 (permission errors) - 404 (not found) - 422 (invalid params) """ try: # Add jitter to prevent thundering herd await asyncio.sleep(random.uniform(0, 0.1)) # Acquire rate limit token await rate_limiter.acquire() # Execute request result = await asyncio.to_thread(func, *args, **kwargs) return result except praw.exceptions.Forbidden as e: logger.error(f"Permission denied: {e}") raise RedditAPIError(f"Access forbidden: {e}") except praw.exceptions.NotFound as e: logger.warning(f"Resource not found: {e}") return None # Return None for deleted/removed content except praw.exceptions.InvalidToken as e: logger.error(f"Invalid OAuth token: {e}") raise RedditAPIError("Authentication error") retry_handler = RetryableRedditRequest() ``` #### F. Response Normalization ```python from typing import Any, Dict from datetime import datetime class ResponseNormalizer: @staticmethod def normalize_post(submission) -> Dict[str, Any]: """Normalize Reddit submission to standard format""" return { "id": submission.id, "type": "post", "title": submission.title, "author": submission.author.name if submission.author else "[deleted]", "subreddit": submission.subreddit.display_name, "created_utc": int(submission.created_utc), "score": submission.score, "upvote_ratio": submission.upvote_ratio, "num_comments": submission.num_comments, "url": submission.url, "permalink": submission.permalink, "selftext": submission.selftext[:1000] if submission.selftext else "", "link_flair_text": submission.link_flair_text, "is_self": submission.is_self, "is_video": submission.is_video, "over_18": submission.over_18, "spoiler": submission.spoiler, "stickied": submission.stickied } @staticmethod def normalize_comment(comment) -> Dict[str, Any]: """Normalize Reddit comment to standard format""" return { "id": comment.id, "type": "comment", "author": comment.author.name if comment.author else "[deleted]", "body": comment.body, "score": comment.score, "created_utc": int(comment.created_utc), "depth": comment.depth, "parent_id": comment.parent_id, "is_submitter": comment.is_submitter, "stickied": comment.stickied, "distinguished": comment.distinguished } @staticmethod def normalize_user(redditor) -> Dict[str, Any]: """Normalize Reddit user to standard format""" return { "username": redditor.name, "created_utc": int(redditor.created_utc), "link_karma": redditor.link_karma, "comment_karma": redditor.comment_karma, "is_gold": redditor.is_gold, "is_mod": redditor.is_mod, "has_verified_email": redditor.has_verified_email } @staticmethod def normalize_subreddit(subreddit) -> Dict[str, Any]: """Normalize subreddit to standard format""" return { "name": subreddit.display_name, "title": subreddit.title, "description": subreddit.public_description, "subscribers": subreddit.subscribers, "active_users": subreddit.active_user_count, "created_utc": int(subreddit.created_utc), "over18": subreddit.over18, "url": subreddit.url } normalizer = ResponseNormalizer() ``` --- ### 2.3 Caching System (Redis) #### A. Redis Configuration and Connection Pooling ```python import redis.asyncio as redis from redis.asyncio.connection import ConnectionPool class RedisCache: def __init__(self): self.pool: Optional[ConnectionPool] = None self.client: Optional[redis.Redis] = None self._initialize_pool() def _initialize_pool(self): """Initialize Redis connection pool""" redis_url = os.getenv( "REDIS_URL", "redis://localhost:6379/0" ) self.pool = ConnectionPool.from_url( redis_url, max_connections=20, decode_responses=True, socket_timeout=5, socket_connect_timeout=5, retry_on_timeout=True ) self.client = redis.Redis(connection_pool=self.pool) async def ping(self) -> bool: """Check Redis connection""" try: return await self.client.ping() except Exception as e: logger.error(f"Redis connection failed: {e}") return False async def close(self): """Close Redis connection pool""" if self.client: await self.client.close() if self.pool: await self.pool.disconnect() cache = RedisCache() ``` #### B. Cache Key Strategy **Pattern:** `reddit:{tool}:{params_hash}:{version}` ```python import hashlib import json class CacheKeyGenerator: VERSION = "v1" # Increment when response format changes @staticmethod def generate(tool_name: str, params: dict) -> str: """ Generate cache key for Reddit tool request Args: tool_name: Name of the MCP tool params: Tool parameters (dict) Returns: Cache key string Examples: reddit:search:a3f8d9c2e1b4:v1 reddit:subreddit_posts:7b2e4a1c9d3f:v1 """ # Sort params for consistent hashing params_str = json.dumps(params, sort_keys=True) params_hash = hashlib.md5(params_str.encode()).hexdigest()[:12] return f"reddit:{tool_name}:{params_hash}:{CacheKeyGenerator.VERSION}" @staticmethod def parse(cache_key: str) -> dict: """Parse cache key back to components""" parts = cache_key.split(":") if len(parts) != 4: raise ValueError(f"Invalid cache key format: {cache_key}") return { "prefix": parts[0], "tool": parts[1], "params_hash": parts[2], "version": parts[3] } key_generator = CacheKeyGenerator() ``` #### C. TTL Policies by Content Type ```python from enum import Enum class CacheTTL(Enum): """Cache TTL policies for different content types""" # Real-time content (changes rapidly) NEW_POSTS = 120 # 2 minutes HOT_POSTS = 300 # 5 minutes RISING_POSTS = 180 # 3 minutes # Historical content (stable) TOP_POSTS = 3600 # 1 hour SEARCH_RESULTS = 300 # 5 minutes # User/subreddit metadata (slow-changing) USER_INFO = 600 # 10 minutes SUBREDDIT_INFO = 3600 # 1 hour # Comments (relatively stable after initial activity) COMMENTS = 900 # 15 minutes # Trending/analysis (computationally expensive) TRENDING_TOPICS = 900 # 15 minutes SENTIMENT_ANALYSIS = 3600 # 1 hour @staticmethod def get_ttl(tool_name: str, params: dict) -> int: """Determine TTL based on tool and parameters""" if tool_name == "get_subreddit_posts": sort = params.get("sort", "hot") if sort == "new": return CacheTTL.NEW_POSTS.value elif sort == "hot": return CacheTTL.HOT_POSTS.value elif sort == "rising": return CacheTTL.RISING_POSTS.value else: # top, controversial return CacheTTL.TOP_POSTS.value elif tool_name == "search_reddit": return CacheTTL.SEARCH_RESULTS.value elif tool_name == "get_post_comments": return CacheTTL.COMMENTS.value elif tool_name == "get_trending_topics": return CacheTTL.TRENDING_TOPICS.value elif tool_name == "get_user_info": return CacheTTL.USER_INFO.value elif tool_name == "get_subreddit_info": return CacheTTL.SUBREDDIT_INFO.value elif tool_name == "analyze_sentiment": return CacheTTL.SENTIMENT_ANALYSIS.value # Default fallback return 300 # 5 minutes ``` #### D. Cache Operations ```python import pickle from typing import Optional, Any class CacheManager: def __init__(self, redis_client: redis.Redis): self.redis = redis_client async def get(self, key: str) -> Optional[dict]: """Retrieve cached value""" try: value = await self.redis.get(key) if value: # Parse stored data cached_data = json.loads(value) logger.debug( "Cache hit", key=key, age_seconds=cached_data.get("age_seconds", 0) ) return cached_data logger.debug("Cache miss", key=key) return None except Exception as e: logger.error(f"Cache get error: {e}", key=key) return None # Fail open (don't break on cache errors) async def set(self, key: str, value: Any, ttl: int): """Store value in cache with TTL""" try: # Add metadata cached_data = { "data": value, "cached_at": datetime.utcnow().isoformat(), "ttl": ttl } # Store with expiration await self.redis.setex( key, ttl, json.dumps(cached_data) ) logger.debug( "Cache set", key=key, ttl=ttl ) except Exception as e: logger.error(f"Cache set error: {e}", key=key) # Fail silently (cache write failures shouldn't break requests) async def delete(self, key: str): """Delete cached value""" try: await self.redis.delete(key) except Exception as e: logger.error(f"Cache delete error: {e}", key=key) async def get_or_fetch(self, key: str, fetch_func, ttl: int) -> dict: """ Get from cache or fetch and cache Args: key: Cache key fetch_func: Async function to fetch data if cache miss ttl: Time to live in seconds Returns: Cached or fetched data with metadata """ # Try cache first cached = await self.get(key) if cached: cache_age = ( datetime.utcnow() - datetime.fromisoformat(cached["cached_at"]) ).total_seconds() return { "data": cached["data"], "metadata": { "cached": True, "cache_age_seconds": int(cache_age), "ttl": cached["ttl"] } } # Cache miss - fetch data data = await fetch_func() # Store in cache await self.set(key, data, ttl) return { "data": data, "metadata": { "cached": False, "cache_age_seconds": 0, "ttl": ttl } } cache_manager = CacheManager(cache.client) ``` #### E. Cache Warming Strategy ```python class CacheWarmer: """Pre-populate cache with frequently accessed data""" WARM_SUBREDDITS = [ "all", "popular", "news", "technology", "worldnews", "AskReddit", "programming" ] async def warm_cache(self): """Warm cache on server startup""" logger.info("Starting cache warming") tasks = [] for subreddit in self.WARM_SUBREDDITS: # Warm hot posts tasks.append(self._warm_subreddit_posts(subreddit, "hot")) # Warm top posts tasks.append(self._warm_subreddit_posts(subreddit, "top")) await asyncio.gather(*tasks, return_exceptions=True) logger.info("Cache warming completed") async def _warm_subreddit_posts(self, subreddit: str, sort: str): """Warm cache for specific subreddit""" try: params = {"subreddit": subreddit, "sort": sort, "limit": 25} key = key_generator.generate("get_subreddit_posts", params) ttl = CacheTTL.get_ttl("get_subreddit_posts", params) # Check if already cached if await cache_manager.get(key): return # Fetch and cache data = await self._fetch_subreddit_posts(params) await cache_manager.set(key, data, ttl) logger.debug(f"Warmed cache for r/{subreddit} ({sort})") except Exception as e: logger.warning(f"Cache warming failed for r/{subreddit}: {e}") cache_warmer = CacheWarmer() ``` #### F. Cache Invalidation Logic ```python class CacheInvalidator: """Handle cache invalidation scenarios""" async def invalidate_tool(self, tool_name: str): """Invalidate all cached entries for a tool""" pattern = f"reddit:{tool_name}:*" await self._delete_pattern(pattern) async def invalidate_subreddit(self, subreddit: str): """Invalidate all cached entries for a subreddit""" # This is complex - would need secondary indexing # For MVP: rely on TTL expiration pass async def invalidate_on_error(self, key: str): """Invalidate cache entry if data is stale/invalid""" await cache_manager.delete(key) async def _delete_pattern(self, pattern: str): """Delete all keys matching pattern""" cursor = 0 while True: cursor, keys = await cache.client.scan( cursor=cursor, match=pattern, count=100 ) if keys: await cache.client.delete(*keys) if cursor == 0: break cache_invalidator = CacheInvalidator() ``` --- ### 2.4 Tool Implementation Layer #### Tool 1: search_reddit **Input Validation:** ```python class SearchRedditInput(BaseModel): query: str = Field(..., min_length=1, max_length=500) subreddit: Optional[str] = Field(None, pattern="^[A-Za-z0-9_]+$") time_filter: Literal["hour", "day", "week", "month", "year", "all"] = "week" sort: Literal["relevance", "hot", "top", "new", "comments"] = "relevance" limit: int = Field(25, ge=1, le=100) ``` **Business Logic:** ```python @mcp.tool() async def search_reddit(params: SearchRedditInput) -> dict: """Search Reddit for posts matching query""" # Generate cache key cache_key = key_generator.generate("search_reddit", params.dict()) ttl = CacheTTL.get_ttl("search_reddit", params.dict()) # Define fetch function async def fetch_results(): reddit = reddit_client.get_client() # Build search query if params.subreddit: search_target = reddit.subreddit(params.subreddit) else: search_target = reddit.subreddit("all") # Execute search results = await retry_handler.execute( search_target.search, query=params.query, sort=params.sort, time_filter=params.time_filter, limit=params.limit ) # Normalize results normalized = [ normalizer.normalize_post(post) for post in results ] return { "results": normalized, "query": params.query, "total_found": len(normalized) } # Get from cache or fetch response = await cache_manager.get_or_fetch(cache_key, fetch_results, ttl) # Add rate limit info response["metadata"]["rate_limit_remaining"] = rate_limiter.get_remaining() return response ``` **Reddit API Calls:** - Endpoint: `/search.json?q={query}&t={time}&sort={sort}&limit={limit}` - Rate: 1 API call per request **Caching Strategy:** - TTL: 5 minutes (search results change moderately) - Key pattern: `reddit:search_reddit:{hash}:v1` **Error Handling:** - Invalid query → ValidationError (422) - Subreddit not found → Return empty results - Rate limit → RateLimitError with retry_after --- #### Tool 2: get_subreddit_posts **Input Validation:** ```python class GetSubredditPostsInput(BaseModel): subreddit: str = Field(..., pattern="^[A-Za-z0-9_]+$") sort: Literal["hot", "new", "top", "rising", "controversial"] = "hot" time_filter: Optional[Literal["hour", "day", "week", "month", "year", "all"]] = None limit: int = Field(25, ge=1, le=100) @validator('time_filter') def validate_time_filter(cls, v, values): if values.get('sort') in ['top', 'controversial'] and not v: raise ValueError("time_filter required for top/controversial") return v ``` **Business Logic:** ```python @mcp.tool() async def get_subreddit_posts(params: GetSubredditPostsInput) -> dict: """Get posts from a specific subreddit""" cache_key = key_generator.generate("get_subreddit_posts", params.dict()) ttl = CacheTTL.get_ttl("get_subreddit_posts", params.dict()) async def fetch_posts(): reddit = reddit_client.get_client() subreddit = reddit.subreddit(params.subreddit) # Select sort method if params.sort == "hot": posts = subreddit.hot(limit=params.limit) elif params.sort == "new": posts = subreddit.new(limit=params.limit) elif params.sort == "top": posts = subreddit.top(time_filter=params.time_filter, limit=params.limit) elif params.sort == "rising": posts = subreddit.rising(limit=params.limit) elif params.sort == "controversial": posts = subreddit.controversial(time_filter=params.time_filter, limit=params.limit) # Execute with retry posts_list = await retry_handler.execute(lambda: list(posts)) # Normalize normalized = [normalizer.normalize_post(post) for post in posts_list] return { "subreddit": params.subreddit, "sort": params.sort, "posts": normalized } response = await cache_manager.get_or_fetch(cache_key, fetch_posts, ttl) response["metadata"]["rate_limit_remaining"] = rate_limiter.get_remaining() return response ``` **Reddit API Calls:** - Endpoint: `/r/{subreddit}/{sort}.json` - Rate: 1 API call per request **Caching Strategy:** - TTL: Variable by sort type (2min for new, 5min for hot, 1h for top) - High cache hit rate expected for popular subreddits --- #### Tool 3: get_post_comments **Business Logic:** ```python @mcp.tool() async def get_post_comments(params: GetPostCommentsInput) -> dict: """Get all comments from a Reddit post""" cache_key = key_generator.generate("get_post_comments", params.dict()) ttl = CacheTTL.COMMENTS.value async def fetch_comments(): reddit = reddit_client.get_client() # Extract post ID (handle t3_ prefix and URLs) post_id = params.post_id.replace("t3_", "") submission = reddit.submission(id=post_id) # Fetch with comment sort submission.comment_sort = params.sort # Expand all comment threads await retry_handler.execute(submission.comments.replace_more, limit=0) # Flatten comment forest all_comments = submission.comments.list() # Filter by max_depth if specified if params.max_depth > 0: all_comments = [c for c in all_comments if c.depth <= params.max_depth] # Build nested structure comments_tree = self._build_comment_tree(all_comments) return { "post": { "id": submission.id, "title": submission.title, "num_comments": submission.num_comments }, "comments": comments_tree, "metadata": { "total_comments": len(all_comments), "returned_comments": len(comments_tree) } } response = await cache_manager.get_or_fetch(cache_key, fetch_comments, ttl) return response def _build_comment_tree(comments: list) -> list: """Build nested comment structure""" comment_map = {} root_comments = [] # First pass: create all comment nodes for comment in comments: normalized = normalizer.normalize_comment(comment) normalized["replies"] = [] comment_map[comment.id] = normalized # Second pass: build tree for comment in comments: if comment.parent_id.startswith("t3_"): # Top-level comment root_comments.append(comment_map[comment.id]) else: parent_id = comment.parent_id.replace("t1_", "") if parent_id in comment_map: comment_map[parent_id]["replies"].append(comment_map[comment.id]) return root_comments ``` **Reddit API Calls:** - Endpoint: `/r/{subreddit}/comments/{id}.json` - Additional: `/api/morechildren` (for expanding threads) - Rate: 1-5 API calls (depends on thread size) **Caching Strategy:** - TTL: 15 minutes - Comments are relatively stable after initial activity --- #### Tool 4: get_trending_topics **Business Logic:** ```python @mcp.tool() async def get_trending_topics(params: GetTrendingTopicsInput) -> dict: """Identify trending topics on Reddit""" cache_key = key_generator.generate("get_trending_topics", params.dict()) ttl = CacheTTL.TRENDING_TOPICS.value async def analyze_trends(): reddit = reddit_client.get_client() # Determine scope if params.scope == "subreddit": target = reddit.subreddit(params.subreddit) else: target = reddit.subreddit("all") # Fetch recent posts (last hour or day) if params.timeframe == "hour": posts = target.new(limit=100) else: posts = target.top(time_filter="day", limit=200) posts_list = await retry_handler.execute(lambda: list(posts)) # Extract keywords from titles keyword_freq = {} keyword_posts = {} for post in posts_list: # Tokenize and extract keywords (simple approach) keywords = self._extract_keywords(post.title) for keyword in keywords: keyword_freq[keyword] = keyword_freq.get(keyword, 0) + 1 if keyword not in keyword_posts: keyword_posts[keyword] = [] keyword_posts[keyword].append(post) # Calculate trending score (frequency + recency) trending = [] for keyword, count in keyword_freq.items(): if count >= 3: # Minimum threshold sample_posts = keyword_posts[keyword][:3] trending.append({ "keyword": keyword, "mentions": count, "growth_rate": self._calculate_growth(keyword, posts_list), "sentiment": await self._quick_sentiment(keyword, sample_posts), "top_subreddits": self._top_subreddits(keyword_posts[keyword]), "sample_posts": [ { "id": p.id, "title": p.title, "score": p.score } for p in sample_posts ] }) # Sort by mentions trending.sort(key=lambda x: x["mentions"], reverse=True) return { "trending_topics": trending[:params.limit], "metadata": { "analysis_timestamp": int(datetime.utcnow().timestamp()), "posts_analyzed": len(posts_list), "unique_keywords": len(keyword_freq) } } response = await cache_manager.get_or_fetch(cache_key, analyze_trends, ttl) return response ``` **Reddit API Calls:** - Endpoint: `/r/{subreddit}/new.json` or `/r/{subreddit}/top.json` - Rate: 1-2 API calls **Caching Strategy:** - TTL: 15 minutes (computationally expensive) - High value for cache hits --- #### Tool 5: analyze_sentiment **Business Logic:** ```python from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer class SentimentAnalyzer: def __init__(self): self.analyzer = SentimentIntensityAnalyzer() def analyze_text(self, text: str) -> dict: """Analyze sentiment of text using VADER""" scores = self.analyzer.polarity_scores(text) # Determine label if scores['compound'] >= 0.05: label = "positive" elif scores['compound'] <= -0.05: label = "negative" else: label = "neutral" return { "score": scores['compound'], "label": label, "confidence": max(scores['pos'], scores['neu'], scores['neg']), "distribution": { "positive": scores['pos'], "neutral": scores['neu'], "negative": scores['neg'] } } sentiment_analyzer = SentimentAnalyzer() @mcp.tool() async def analyze_sentiment(params: AnalyzeSentimentInput) -> dict: """Analyze sentiment of Reddit content""" cache_key = key_generator.generate("analyze_sentiment", params.dict()) ttl = CacheTTL.SENTIMENT_ANALYSIS.value async def perform_analysis(): # Fetch content based on content_type if params.content_type == "post": content = await self._fetch_post(params.content_id) texts = [content["title"], content.get("selftext", "")] elif params.content_type == "comment": content = await self._fetch_comment(params.content_id) texts = [content["body"]] elif params.content_type == "search_results": results = await search_reddit({ "query": params.content_id, "time_filter": params.time_filter, "limit": 100 }) texts = [ f"{r['title']} {r.get('selftext', '')}" for r in results["data"]["results"] ] # Analyze each text sentiments = [sentiment_analyzer.analyze_text(t) for t in texts if t] # Aggregate results avg_score = sum(s["score"] for s in sentiments) / len(sentiments) label_counts = {"positive": 0, "neutral": 0, "negative": 0} for s in sentiments: label_counts[s["label"]] += 1 return { "overall_sentiment": { "score": round(avg_score, 3), "label": max(label_counts, key=label_counts.get), "confidence": round(max(label_counts.values()) / len(sentiments), 2), "distribution": { k: round(v / len(sentiments), 2) for k, v in label_counts.items() } }, "analyzed_items": len(sentiments) } response = await cache_manager.get_or_fetch(cache_key, perform_analysis, ttl) return response ``` **Reddit API Calls:** - Variable: 0-2 API calls (depending on cached content) **Caching Strategy:** - TTL: 1 hour (sentiment doesn't change for historical content) - NLP processing is fast (VADER is rule-based) --- #### Tool 6-8: get_user_info, get_subreddit_info, watch_keywords Similar implementation patterns: - Input validation with Pydantic - Cache-first strategy - Reddit API integration via PRAW - Response normalization - Error handling **watch_keywords** is special: - Requires persistent storage (Apify Dataset) - Background job processing - Webhook/notification system (future enhancement) --- ## 3. Data Models ### 3.1 Input Schemas (JSON Schema Format) All tool inputs are defined using Pydantic models, which automatically generate JSON Schema: ```python from pydantic import BaseModel, Field, validator from typing import Optional, Literal, List # Example: search_reddit input schema class SearchRedditInput(BaseModel): query: str = Field( ..., min_length=1, max_length=500, description="Search query or keywords", example="artificial intelligence" ) subreddit: Optional[str] = Field( None, pattern="^[A-Za-z0-9_]+$", description="Limit search to specific subreddit", example="technology" ) time_filter: Literal["hour", "day", "week", "month", "year", "all"] = Field( "week", description="Time range for search results" ) sort: Literal["relevance", "hot", "top", "new", "comments"] = Field( "relevance", description="Sort order for results" ) limit: int = Field( 25, ge=1, le=100, description="Maximum number of results to return" ) class Config: schema_extra = { "example": { "query": "machine learning", "subreddit": "MachineLearning", "time_filter": "week", "sort": "top", "limit": 50 } } # Auto-generate JSON Schema search_reddit_schema = SearchRedditInput.schema_json(indent=2) ``` ### 3.2 Output Schemas ```python from typing import List, Dict, Any from datetime import datetime class RedditPost(BaseModel): id: str type: Literal["post"] title: str author: str subreddit: str created_utc: int score: int upvote_ratio: float num_comments: int url: str permalink: str selftext: str link_flair_text: Optional[str] is_self: bool is_video: bool over_18: bool spoiler: bool stickied: bool class RedditComment(BaseModel): id: str type: Literal["comment"] author: str body: str score: int created_utc: int depth: int parent_id: str is_submitter: bool stickied: bool distinguished: Optional[str] replies: List['RedditComment'] = [] class ResponseMetadata(BaseModel): cached: bool cache_age_seconds: int ttl: int rate_limit_remaining: int execution_time_ms: float reddit_api_calls: int class SearchRedditOutput(BaseModel): results: List[RedditPost] query: str total_found: int metadata: ResponseMetadata class SentimentAnalysis(BaseModel): score: float = Field(..., ge=-1, le=1) label: Literal["positive", "neutral", "negative"] confidence: float = Field(..., ge=0, le=1) distribution: Dict[str, float] class AnalyzeSentimentOutput(BaseModel): overall_sentiment: SentimentAnalysis analyzed_items: int key_themes: Optional[List[Dict[str, Any]]] metadata: ResponseMetadata ``` ### 3.3 Cache Data Structures ```python class CachedResponse(BaseModel): """Structure stored in Redis""" data: Any # Actual response data cached_at: str # ISO timestamp ttl: int # Original TTL in seconds version: str = "v1" # Schema version class CacheMetrics(BaseModel): """Cache performance metrics""" total_requests: int cache_hits: int cache_misses: int hit_rate: float avg_cache_age_seconds: float @property def cache_hit_percentage(self) -> float: if self.total_requests == 0: return 0.0 return (self.cache_hits / self.total_requests) * 100 ``` ### 3.4 Internal State Models ```python class RateLimitState(BaseModel): """Track rate limit state""" calls_made: int window_start: datetime remaining_calls: int reset_at: datetime class RequestQueueItem(BaseModel): """Item in request queue""" request_hash: str endpoint: str params: Dict[str, Any] priority: int queued_at: datetime status: Literal["queued", "processing", "completed", "failed"] class OAuthTokenState(BaseModel): """OAuth token state""" access_token: str refresh_token: Optional[str] expires_at: datetime scopes: List[str] ``` --- ## 4. Component Interaction Flows ### 4.1 Tool Execution Flow (Happy Path) ``` ┌─────────┐ │ Client │ └────┬────┘ │ 1. JSON-RPC Request │ {"method": "tools/call", "params": {"name": "search_reddit", ...}} ▼ ┌─────────────────┐ │ FastMCP Server │ └────┬────────────┘ │ 2. Route to tool handler ▼ ┌──────────────────┐ │ Input Validation │ (Pydantic) └────┬─────────────┘ │ 3. Validate params ▼ ┌──────────────────┐ │ Middleware Stack │ └────┬─────────────┘ │ 4. Check auth, rate limits ▼ ┌──────────────────┐ │ Tool Function │ (search_reddit) └────┬─────────────┘ │ 5. Generate cache key │ redis:search_reddit:a3f8d9c2:v1 ▼ ┌──────────────────┐ │ Cache Manager │ └────┬─────────────┘ │ 6. Check Redis ▼ ┌─┴─┐ │ ? │ Cache Hit? └─┬─┘ │ ├─── YES ────────────────────────┐ │ │ │ ▼ │ ┌──────────────┐ │ │ Return Cache │ │ │ + metadata │ │ └──────┬───────┘ │ │ │ ├─────► Response │ └─── NO ─────────────────────────┐ │ ▼ ┌─────────────────┐ │ Rate Limiter │ │ (Token Bucket) │ └────┬────────────┘ │ 7. Acquire token (wait if needed) ▼ ┌─────────────────┐ │ Reddit API │ │ Client (PRAW) │ └────┬────────────┘ │ 8. Execute request │ - OAuth2 auth │ - Retry with backoff ▼ ┌─────────────────┐ │ Reddit API │ └────┬────────────┘ │ 9. HTTP 200 + JSON ▼ ┌─────────────────┐ │ Response │ │ Normalizer │ └────┬────────────┘ │ 10. Normalize to standard format ▼ ┌─────────────────┐ │ Cache Manager │ └────┬────────────┘ │ 11. Store in Redis (TTL=5min) ▼ ┌─────────────────┐ │ Add Metadata │ │ - cached: false │ │ - api_calls: 1 │ └────┬────────────┘ │ └─────► Response │ ▼ ┌─────────────────┐ │ Monitoring │ │ - Log metrics │ │ - Apify Dataset │ └─────────────────┘ │ ▼ ┌─────────────────┐ │ Client │ │ (Receives data) │ └─────────────────┘ ``` **Text Sequence:** 1. Client sends JSON-RPC request to MCP server 2. FastMCP routes request to `search_reddit` tool 3. Pydantic validates input parameters 4. Middleware checks authentication and rate limits 5. Tool generates cache key: `reddit:search_reddit:{hash}:v1` 6. Cache Manager checks Redis for cached data 7. **Cache Hit:** Return cached response with metadata (age, TTL) 8. **Cache Miss:** Acquire rate limit token (may wait) 9. PRAW executes Reddit API request with OAuth2 10. Reddit API returns JSON response 11. Response Normalizer converts to standard format 12. Cache Manager stores result in Redis with TTL 13. Add metadata (cached: false, api_calls: 1) 14. Return response to client 15. Log metrics to monitoring system --- ### 4.2 Rate Limit Handling Flow ``` ┌─────────────┐ │ Tool Exec │ └──────┬──────┘ │ Request needs API call ▼ ┌──────────────────┐ │ Rate Limiter │ │ (Token Bucket) │ └──────┬───────────┘ │ Check capacity ▼ ┌───┴───┐ │ Calls │ < 100? │ in │ │ window│ └───┬───┘ │ ├─── YES ──────────────────────┐ │ │ │ ▼ │ ┌───────────────┐ │ │ Grant Token │ │ │ - Add timestamp│ │ │ - Execute call│ │ └───────┬───────┘ │ │ │ └────► Continue │ └─── NO ───────────────────────┐ │ ▼ ┌──────────────────┐ │ Calculate Wait │ │ time = (oldest │ │ call + 60s) - │ │ now │ └────┬─────────────┘ │ ▼ ┌──────────────────┐ │ Log Warning │ │ "Rate limit hit, │ │ waiting Xs" │ └────┬─────────────┘ │ ▼ ┌──────────────────┐ │ async sleep(X) │ └────┬─────────────┘ │ ▼ ┌──────────────────┐ │ Retry acquire() │ └────┬─────────────┘ │ └────► Continue ┌─────────────────────────────────────────────────┐ │ Alternative: Queue Request │ │ │ │ If rate limit hit: │ │ 1. Add request to priority queue │ │ 2. Return to client: "Queued, try again in Xs" │ │ 3. Background worker processes queue │ │ 4. Client polls for result │ └─────────────────────────────────────────────────┘ ``` **Rate Limit Error Response:** ```json { "jsonrpc": "2.0", "id": 123, "error": { "code": -32000, "message": "Rate limit exceeded", "data": { "retry_after_seconds": 15, "rate_limit_tier": "free", "calls_made": 100, "window_reset_at": "2025-11-05T12:35:00Z" } } } ``` --- ### 4.3 Cache Hit/Miss Scenarios **Scenario A: Cache Hit** ``` Request → Cache Check → Redis GET → Found ↓ Parse cached data ↓ Calculate age (now - cached_at) ↓ Return {data, metadata: {cached: true, age: 120s}} Total time: ~10-50ms Reddit API calls: 0 ``` **Scenario B: Cache Miss** ``` Request → Cache Check → Redis GET → Not Found ↓ Execute fetch function ↓ Rate limit check (may wait) ↓ Reddit API call (1-3s) ↓ Normalize response ↓ Store in Redis (SET key value EX ttl) ↓ Return {data, metadata: {cached: false, age: 0}} Total time: ~1-5s Reddit API calls: 1-5 (depending on tool) ``` **Scenario C: Cache Expired** ``` Request → Cache Check → Redis GET → Found but TTL expired ↓ Redis auto-deleted (already gone) ↓ Treat as Cache Miss ↓ [Same as Scenario B] ``` **Scenario D: Cache Error (Fail Open)** ``` Request → Cache Check → Redis GET → Connection Error ↓ Log error ↓ Treat as Cache Miss (don't block request) ↓ Execute fetch function ↓ Try to cache (fail silently if Redis still down) ↓ Return response (without cache metadata) Note: System continues to function even if Redis is down ``` --- ### 4.4 Error Recovery Flow ``` ┌─────────────┐ │ Tool Exec │ └──────┬──────┘ │ ▼ Try Block │ ├──► ValidationError (422) │ ↓ │ Return JSON-RPC error {code: -32602, message: "Invalid params"} │ ├──► RateLimitError (429) │ ↓ │ Calculate retry_after from rate limiter │ ↓ │ Return JSON-RPC error {code: -32000, data: {retry_after_seconds}} │ ├──► RedditAPIError (500/502/503) │ ↓ │ Check retry attempts │ ↓ │ ┌─ Attempt < 3 ───► Exponential backoff │ │ (wait 2^attempt seconds) │ │ ↓ │ │ Retry request │ │ │ └─ Attempt >= 3 ───► Return JSON-RPC error │ {code: -32001, message: "Reddit API unavailable"} │ ├──► NotFoundError (404) │ ↓ │ Return empty results (not an error) │ ↓ │ Return {results: [], metadata: {...}} │ ├──► ForbiddenError (403) │ ↓ │ Check if auth token expired │ ↓ │ ┌─ Token expired ───► Refresh token │ │ ↓ │ │ Retry request (once) │ │ │ └─ Permission denied ─► Return JSON-RPC error │ {code: -32001, message: "Access forbidden"} │ ├──► CacheError │ ↓ │ Log warning (cache unavailable) │ ↓ │ Continue without cache (fail open) │ ↓ │ Execute request normally │ └──► Unexpected Error ↓ Log full traceback ↓ Return generic JSON-RPC error {code: -32603, message: "Internal server error"} ┌─────────────────────────────────────────────────┐ │ Circuit Breaker Pattern (Future Enhancement) │ │ │ │ Track Reddit API error rate: │ │ - If error rate > 50% over 1 minute │ │ - Open circuit (stop calling API) │ │ - Return cached data (even if stale) │ │ - Retry after 30 seconds │ │ - Close circuit when healthy │ └─────────────────────────────────────────────────┘ ``` --- ## 5. Technology Stack Justification ### 5.1 Python 3.11 (Language) **Why chosen:** - **MCP SDK Support**: Official `mcp` Python SDK from Anthropic - **Reddit API Library**: PRAW is the most mature Reddit API wrapper (Python-native) - **Data Science Ecosystem**: VADER sentiment analysis, NLP tools - **Async/Await**: Native async support (asyncio) for concurrent operations - **Type Safety**: Type hints + Pydantic for robust validation - **Apify SDK**: Full support for Apify Actor development **Alternatives considered:** - JavaScript/TypeScript: Good MCP support, but PRAW equivalent (Snoowrap) is unmaintained - Go: Fast but less mature MCP ecosystem, no VADER equivalent **Performance characteristics:** - Async I/O: Handle 100+ concurrent requests - Memory efficient: ~50-100MB base footprint - Fast enough: <1ms overhead vs C/Rust for I/O-bound tasks --- ### 5.2 FastMCP (MCP Framework) **Why chosen:** - **Official Framework**: Developed by Anthropic specifically for Python MCP servers - **Minimal Boilerplate**: Decorator-based tool registration - **Auto-validation**: Integrates with Pydantic for input/output validation - **Transport Agnostic**: Supports stdio (local) and HTTP/SSE (remote) - **Active Development**: Regular updates, aligned with MCP spec changes **Example benefit:** ```python # Without FastMCP: 50+ lines of JSON-RPC handling # With FastMCP: @mcp.tool() def search_reddit(query: str) -> dict: return {"results": []} # Framework handles everything else ``` **Alternatives considered:** - Custom JSON-RPC implementation: Too much overhead - Generic RPC frameworks: Not MCP-specific --- ### 5.3 PRAW (Reddit API Client) **Why chosen:** - **Battle-tested**: 10+ years of development, used by thousands of apps - **Auto Rate Limiting**: Built-in 30s caching and rate limit handling - **OAuth2 Handling**: Automatic token refresh - **Pythonic API**: Intuitive, well-documented - **Active Maintenance**: Regular updates for Reddit API changes **Example:** ```python # PRAW: 3 lines reddit = praw.Reddit(client_id=..., client_secret=...) posts = reddit.subreddit("technology").hot(limit=25) # Manual approach: 100+ lines (OAuth, pagination, error handling) ``` **Alternatives considered:** - Raw requests: Too much manual work - Snoowrap (JS): Unmaintained, last update 4 years ago **Performance:** - Handles pagination automatically - Connection pooling built-in - Lazy loading (doesn't fetch data until accessed) --- ### 5.4 Redis (Cache) **Why chosen:** - **Speed**: Sub-millisecond reads (avg 0.2ms) - **TTL Support**: Built-in expiration (perfect for time-based caching) - **Atomic Operations**: SET/GET are thread-safe - **Persistence Options**: Can survive restarts (AOF/RDB) - **Scalability**: Handles millions of keys easily - **Apify Compatible**: Easy to deploy Redis Cloud instance **Cache performance:** ``` Cache Hit: 10-50ms (Redis GET + deserialization) Cache Miss: 1,000-3,000ms (Reddit API call) Speedup: 20-300x faster ``` **Alternatives considered:** - In-memory dict: Lost on restart, no TTL, not distributed - Memcached: No persistence, less flexible data structures - DynamoDB: Higher latency (~10ms), more expensive **Configuration:** - Connection pooling: 20 connections - Maxmemory policy: `allkeys-lru` (evict least recently used) - Persistence: AOF (append-only file) for durability --- ### 5.5 Pydantic (Data Validation) **Why chosen:** - **Type Safety**: Runtime validation of inputs/outputs - **Auto JSON Schema**: Generate OpenAPI-compatible schemas - **FastMCP Integration**: Native support in FastMCP - **Error Messages**: Clear, actionable validation errors - **Performance**: Fast (uses Rust under the hood in v2) **Example:** ```python class SearchInput(BaseModel): query: str = Field(..., min_length=1, max_length=500) limit: int = Field(25, ge=1, le=100) # Auto-validates: try: params = SearchInput(query="", limit=500) # Fails validation except ValidationError as e: # Returns: {"query": ["ensure this value has at least 1 characters"]} ``` **Benefits:** - Prevents invalid data from reaching Reddit API - Generates documentation automatically - Type hints improve IDE autocomplete --- ### 5.6 VADER (Sentiment Analysis) **Why chosen:** - **Speed**: Rule-based (no ML model), <1ms per text - **Social Media Optimized**: Designed for Twitter/Reddit (handles slang, emojis) - **No Training**: Works out-of-the-box - **Lightweight**: 1MB library, no GPU needed - **Academic Credibility**: 5,000+ citations **Performance:** ```python from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer analyzer = SentimentIntensityAnalyzer() analyzer.polarity_scores("This is amazing!") # {'neg': 0.0, 'neu': 0.344, 'pos': 0.656, 'compound': 0.6588} # Time: <1ms ``` **Alternatives considered:** - Transformer models (BERT): 100-1000x slower, requires GPU - TextBlob: Less accurate for social media - Cloud APIs (AWS Comprehend): Expensive, adds latency **Limitations:** - No context understanding (sarcasm detection poor) - English-only - Good enough for MVP, can enhance later --- ### 5.7 Technology Stack Summary Table | Component | Technology | Version | Purpose | Key Benefit | |-----------|-----------|---------|---------|-------------| | Language | Python | 3.11+ | Runtime | Async support, rich ecosystem | | MCP Framework | FastMCP | 1.0+ | Server | Minimal boilerplate, official | | Reddit Client | PRAW | 7.7+ | API | Auto rate limiting, OAuth2 | | Cache | Redis | 7.0+ | Performance | Sub-ms reads, TTL support | | Validation | Pydantic | 2.0+ | Data | Runtime validation, JSON Schema | | Sentiment | VADER | 3.3+ | NLP | Fast, social media optimized | | HTTP Server | Uvicorn | 0.25+ | ASGI | High performance async | | Logging | structlog | 23.0+ | Observability | Structured JSON logs | | Testing | pytest | 7.4+ | Quality | Async test support | | Deployment | Apify Actor | Latest | Hosting | Standby mode, pay-per-event | **Total Dependencies:** ~15 packages (lightweight) **Docker Image Size:** ~200MB (Alpine-based) **Memory Footprint:** ~100-150MB (idle) **Startup Time:** <2 seconds --- ## 6. Performance & Scalability ### 6.1 Performance Targets | Metric | Target | Measurement | |--------|--------|-------------| | Latency (cached) | <500ms | p95 | | Latency (uncached) | <3s | p95 | | Cache hit rate | >75% | Overall | | Throughput | 100 req/min | Reddit API limit | | Availability | 99.5% | Monthly uptime | | Error rate | <1% | Of total requests | ### 6.2 Scalability Analysis **Current Architecture (Single Instance):** - Reddit API: 100 QPM limit - With 75% cache hit: 400 effective QPM - Avg user: 10 requests/day - **Capacity: ~1,700 DAU** (400 * 60 * 24 / 10) **Scaling Strategies:** 1. **Horizontal Scaling (Multiple Reddit Accounts)** - Deploy 5 instances with different Reddit API keys - Capacity: 8,500 DAU - Cost: $0 (free tier per account) 2. **Cache Optimization** - Improve cache hit rate to 90% - Capacity doubles: 3,400 DAU per instance 3. **Paid Reddit API Tier** - $0.24 per 1K calls = unlimited QPM - Capacity: Unlimited (budget-dependent) ### 6.3 Bottleneck Analysis **Primary Bottleneck:** Reddit API rate limit (100 QPM) - Mitigation: Aggressive caching (75%+ hit rate) - Fallback: Queue requests, return "retry in X seconds" **Secondary Bottleneck:** Redis throughput - Redis capacity: 100K ops/sec - Our usage: <100 ops/sec - Not a concern until 1,000x scale **Tertiary Bottleneck:** Sentiment analysis (NLP) - VADER throughput: 10K texts/sec - Our usage: <100 texts/sec - Not a concern --- ## 7. Security Architecture ### 7.1 Authentication & Authorization **Reddit API Credentials:** - Stored in Apify Actor secrets (encrypted at rest) - Rotated quarterly - Never logged or exposed in responses **User Authentication:** - Phase 1 (MVP): No user auth (open access) - Phase 2 (v1.0): API key per user - Phase 3 (v2.0): OAuth2 for user-specific operations **Authorization Levels:** ```python class AccessTier(Enum): FREE = "free" # 10K calls/month PRO = "pro" # 100K calls/month BUSINESS = "business" # 2M calls/month ENTERPRISE = "enterprise" # Unlimited ``` ### 7.2 Input Validation & Sanitization **All inputs validated:** - Length limits (prevent DoS) - Pattern matching (prevent injection) - Type checking (prevent crashes) **Example:** ```python class SearchInput(BaseModel): query: str = Field(..., min_length=1, max_length=500) @validator('query') def sanitize(cls, v): # Remove null bytes v = v.replace('\x00', '') # Strip whitespace v = v.strip() # Limit to printable characters v = ''.join(c for c in v if c.isprintable()) return v ``` ### 7.3 Rate Limiting (User-Level) ```python class UserRateLimiter: def __init__(self, redis_client): self.redis = redis_client async def check_limit(self, user_id: str, tier: AccessTier) -> bool: """Check if user exceeded rate limit""" key = f"rate_limit:{user_id}:{datetime.utcnow().strftime('%Y-%m')}" # Get current usage usage = await self.redis.get(key) or 0 # Check tier limit tier_limits = { AccessTier.FREE: 10_000, AccessTier.PRO: 100_000, AccessTier.BUSINESS: 2_000_000, AccessTier.ENTERPRISE: float('inf') } if int(usage) >= tier_limits[tier]: raise RateLimitError(f"Monthly limit exceeded") # Increment usage await self.redis.incr(key) await self.redis.expire(key, 31 * 24 * 60 * 60) # 31 days return True ``` ### 7.4 Data Privacy **PII Handling:** - No user data stored (stateless) - Reddit usernames treated as public (per Reddit ToS) - No email/IP logging - GDPR-compliant (no personal data retention) **Caching Privacy:** - Cache keys hashed (don't expose query content) - TTL-based expiration (no long-term storage) - No cross-user data leakage --- ## 8. Deployment Architecture ### 8.1 Apify Actor Configuration **Dockerfile:** ```dockerfile FROM apify/actor-python:3.11 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . ./ CMD ["python", "src/main.py"] ``` **actor.json:** ```json { "actorSpecification": 1, "name": "reddit-mcp-server", "title": "Reddit MCP Server", "version": "1.0.0", "usesStandbyMode": true, "webServerMcpPath": "/mcp", "environmentVariables": { "REDDIT_CLIENT_ID": { "type": "secret", "required": true }, "REDDIT_CLIENT_SECRET": { "type": "secret", "required": true }, "REDIS_URL": { "type": "string", "required": true } } } ``` ### 8.2 Infrastructure Components ``` ┌─────────────────────────────────────────────────┐ │ Apify Platform │ │ │ │ ┌────────────────────────────────────────┐ │ │ │ Actor Instance (Standby Mode) │ │ │ │ - Always-on HTTP endpoint │ │ │ │ - Auto-scaling (1-10 instances) │ │ │ │ - Health checks every 30s │ │ │ └────────────────────────────────────────┘ │ │ │ │ ┌────────────────────────────────────────┐ │ │ │ Key-Value Store (Actor secrets) │ │ │ │ - Reddit credentials │ │ │ │ - API keys │ │ │ └────────────────────────────────────────┘ │ │ │ │ ┌────────────────────────────────────────┐ │ │ │ Dataset (Analytics) │ │ │ │ - Request logs │ │ │ │ - Performance metrics │ │ │ └────────────────────────────────────────┘ │ └─────────────────────────────────────────────────┘ │ │ Network ▼ ┌─────────────────────────────────────────────────┐ │ Redis Cloud │ │ - Plan: 250MB (Free tier) │ │ - Region: us-east-1 │ │ - Eviction: allkeys-lru │ │ - Persistence: AOF │ └─────────────────────────────────────────────────┘ ``` ### 8.3 Monitoring & Observability **Metrics Tracked:** - Request count (by tool, by user) - Latency (p50, p95, p99) - Cache hit rate - Error rate (by type) - Reddit API quota usage **Logging:** ```python import structlog logger = structlog.get_logger() # Structured logs logger.info( "tool_execution", tool="search_reddit", user_id="user_123", duration_ms=1234, cached=False, error=None ) ``` **Alerts:** - Error rate >5% (1 minute) - Cache hit rate <50% (5 minutes) - Reddit API quota >80% - Response time p95 >5s ### 8.4 Cost Estimate | Component | Tier | Cost/Month | |-----------|------|------------| | Apify Actor | Free (10GB-hrs) | $0 | | Apify Actor | Paid (overage) | $0-50 | | Redis Cloud | 250MB | $0 | | Reddit API | Free (100 QPM) | $0 | | Reddit API | Paid (overage) | $0-200 | | **Total (MVP)** | | **$0-250** | **Break-even Analysis:** - $19/month Pro tier: 50 users needed - $99/month Business tier: 10 users needed - Target: 100 paying users by Month 3 = $2,000 MRR --- ## Appendix: Implementation Checklist ### Week 1: MVP Foundation - [ ] Set up Apify Actor project - [ ] Configure Reddit API credentials - [ ] Initialize FastMCP server - [ ] Implement Redis caching layer - [ ] Build rate limiter (token bucket) - [ ] Create Tool 1: search_reddit - [ ] Create Tool 2: get_subreddit_posts - [ ] Write unit tests - [ ] Deploy to Apify (standby mode) ### Week 2: Core Tools - [ ] Create Tool 3: get_post_comments - [ ] Create Tool 4: get_trending_topics - [ ] Implement error handling middleware - [ ] Add monitoring/logging (Apify datasets) - [ ] Cache warming on startup - [ ] Integration tests - [ ] Documentation (README, API reference) - [ ] Beta testing with 10 users ### Week 3: Monetization - [ ] Create Tool 5: analyze_sentiment (VADER) - [ ] Create Tool 6: get_user_info - [ ] Implement user authentication (API keys) - [ ] Usage tracking and billing integration - [ ] Create landing page - [ ] Pricing tier enforcement - [ ] Analytics dashboard - [ ] Submit to Apify Store ### Week 4: Polish & Launch - [ ] Create Tool 7: get_subreddit_info - [ ] Create Tool 8: watch_keywords (background jobs) - [ ] Performance optimization (cache hit rate >75%) - [ ] Security audit - [ ] Load testing (100 concurrent users) - [ ] Final documentation review - [ ] Marketing push (Product Hunt, HN) - [ ] Monitor metrics, iterate --- **End of System Architecture Document** **Version:** 1.0 **Status:** Ready for Implementation **Next Steps:** Begin Week 1 development sprint

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/padak/apify-actor-reddit-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server