Skip to main content
Glama
freshrss_mcp_server.py17.2 kB
#!/usr/bin/env python3 """ FreshRSS MCP Server A Model Context Protocol server for interacting with FreshRSS via its Google Reader compatible API. Provides tools for authentication, browsing feeds, reading articles, and managing subscriptions. """ import asyncio import aiohttp import argparse import json import logging import os import sys from typing import Any, Dict, List, Optional, Union from urllib.parse import urlencode, urljoin from datetime import datetime from functools import wraps from fastmcp import FastMCP # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize the MCP server mcp = FastMCP("FreshRSS") # --- Configuration and Session Management --- class FreshRSSConfig: def __init__(self): self.base_url: Optional[str] = None self.auth_token: Optional[str] = None self.api_token: Optional[str] = None self.session: Optional[aiohttp.ClientSession] = None async def get_session(self) -> aiohttp.ClientSession: if self.session is None or self.session.closed: self.session = aiohttp.ClientSession() return self.session def get_headers(self) -> Dict[str, str]: headers = {"Content-Type": "application/x-www-form-urlencoded"} if self.auth_token: headers["Authorization"] = f"GoogleLogin auth={self.auth_token}" return headers async def authenticate(self, base_url: str, email: str, password: str) -> bool: """Authenticate with FreshRSS server and store credentials.""" try: self.base_url = base_url.rstrip('/') session = await self.get_session() # Clear old tokens before re-authenticating self.auth_token = None self.api_token = None auth_url = urljoin(self.base_url, '/api/greader.php/accounts/ClientLogin') auth_data = {"Email": email, "Passwd": password} async with session.post(auth_url, data=auth_data) as response: if response.status != 200: logger.error(f"Authentication failed with status: {response.status}") return False auth_text = await response.text() auth_info = {key: value for key, value in (line.split('=', 1) for line in auth_text.strip().split('\n') if '=' in line)} if 'Auth' not in auth_info: logger.error("Authentication response did not contain 'Auth' token.") return False self.auth_token = auth_info['Auth'] token_url = urljoin(self.base_url, '/api/greader.php/reader/api/0/token') async with session.get(token_url, headers=self.get_headers()) as token_response: if token_response.status == 200: self.api_token = (await token_response.text()).strip() logger.info(f"Successfully authenticated with FreshRSS at {self.base_url}") return True except aiohttp.ClientError as e: logger.exception(f"HTTP error during authentication: {e}") return False except Exception as e: logger.exception("An unexpected error occurred during authentication") return False config = FreshRSSConfig() # --- API Request Handling and Decorators --- async def _api_request(method: str, url_path: str, **kwargs) -> Dict[str, Any]: """ A centralized function to handle API requests, including authentication and re-authentication. """ await ensure_authenticated() session = await config.get_session() url = urljoin(config.base_url, url_path) # For POST requests that need the API token, add it to the data payload if method.upper() == 'POST' and kwargs.get("requires_token"): data = kwargs.get("data", {}) if not config.api_token: return {"error": "Missing API token for POST request."} data["T"] = config.api_token kwargs["data"] = data async def perform_request(): return await session.request(method, url, headers=config.get_headers(), **kwargs) response = await perform_request() # Handle expired token: re-authenticate and retry once. if response.status == 401: logger.warning("Token expired (401 Unauthorized). Re-authenticating...") await ensure_authenticated(force=True) response = await perform_request() # Retry the request if response.status != 200: return {"error": f"API request failed: {response.status} {await response.text()}"} try: return await response.json() except (json.JSONDecodeError, aiohttp.ContentTypeError): return {"success": True, "message": await response.text()} def authenticated_tool(func): """Decorator to handle authentication and error handling for MCP tools.""" @wraps(func) async def wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except Exception as e: logger.exception(f"Error in tool '{func.__name__}'") return {"error": str(e)} return wrapper # --- MCP Tools --- @mcp.tool() @authenticated_tool async def get_user_info() -> Dict[str, Any]: """ Get information about the authenticated user. """ return await _api_request("get", '/api/greader.php/reader/api/0/user-info') @mcp.tool() @authenticated_tool async def list_all_subscriptions() -> Dict[str, Any]: """ List all RSS feed subscriptions. """ data = await _api_request("get", '/api/greader.php/reader/api/0/subscription/list') if "subscriptions" in data: data["subscriptions"] = [{ "id": sub.get("id", ""), "title": sub.get("title", ""), "url": sub.get("url", ""), "htmlUrl": sub.get("htmlUrl", ""), "categories": [cat.get("label", "") for cat in sub.get("categories", [])] } for sub in data["subscriptions"]] data["count"] = len(data["subscriptions"]) return data @mcp.tool() @authenticated_tool async def list_subscriptions_by_category(category: str) -> Dict[str, Any]: """ List RSS feed subscriptions filtered by a specific category. Args: category: The category to filter subscriptions by (case-insensitive). """ data = await _api_request("get", '/api/greader.php/reader/api/0/subscription/list') if "subscriptions" in data: all_subscriptions = [{ "id": sub.get("id", ""), "title": sub.get("title", ""), "url": sub.get("url", ""), "htmlUrl": sub.get("htmlUrl", ""), "categories": [cat.get("label", "") for cat in sub.get("categories", [])] } for sub in data["subscriptions"]] category_lower = category.lower() filtered_subs = [ sub for sub in all_subscriptions if any(cat.lower() == category_lower for cat in sub["categories"]) ] data["subscriptions"] = filtered_subs data["count"] = len(filtered_subs) return data @mcp.tool() @authenticated_tool async def add_subscription(feed_url: str) -> Dict[str, Any]: """ Add a new RSS feed subscription. """ data = {"quickadd": feed_url} result = await _api_request("post", '/api/greader.php/reader/api/0/subscription/quickadd', data=data) if "error" not in result: return { "success": True, "message": f"Successfully added subscription: {result.get('streamName', feed_url)}", "details": result } return result @mcp.tool() @authenticated_tool async def get_unread_counts() -> Dict[str, Any]: """ Get unread article counts for all subscriptions and categories. """ data = await _api_request("get", '/api/greader.php/reader/api/0/unread-count') if "unreadcounts" in data: total_unread = sum(item.get("count", 0) for item in data["unreadcounts"]) return { "total_unread": total_unread, "max_items": data.get("max", 0), "unread_counts": [{ "id": item.get("id", ""), "count": item.get("count", 0), "newest_timestamp": item.get("newestItemTimestampUsec", "") } for item in data["unreadcounts"]] } return data def _format_articles(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Helper to format a list of article items.""" formatted_articles = [] for item in items: article = { "id": item.get("id", ""), "title": item.get("title", ""), "author": item.get("author", ""), "published": datetime.fromtimestamp(item.get("published", 0)).isoformat() if item.get("published") else "", "updated": datetime.fromtimestamp(item.get("updated", 0)).isoformat() if item.get("updated") else "", "summary": item.get("summary", {}).get("content", ""), "url": (item.get("alternate", [{}])[0].get("href", "")), "feed_title": item.get("origin", {}).get("title", ""), "feed_url": item.get("origin", {}).get("htmlUrl", "") } formatted_articles.append(article) return formatted_articles async def _fetch_articles(stream_id: str, count: int, sort_order: str, exclude_target: Optional[str], continuation: Optional[str]) -> Dict[str, Any]: """Internal function to fetch and format articles.""" params = {"n": count, "r": sort_order} if exclude_target: params["xt"] = exclude_target if continuation: params["c"] = continuation # URL-encode the stream_id to handle special characters encoded_stream_id = urlencode({"a": stream_id})[2:] url_path = f'/api/greader.php/reader/api/0/stream/contents/{encoded_stream_id}' data = await _api_request("get", url_path, params=params) if "items" in data: formatted_articles = _format_articles(data["items"]) return { "stream_id": data.get("id", stream_id), "updated": datetime.fromtimestamp(data.get("updated", 0)).isoformat() if data.get("updated") else "", "article_count": len(formatted_articles), "articles": formatted_articles, "continuation": data.get("continuation", "") } return data @mcp.tool() @authenticated_tool async def get_articles(stream_id: str = "user/-/state/com.google/reading-list", count: int = 20, sort_order: str = "d", exclude_target: Optional[str] = None, continuation: Optional[str] = None) -> Dict[str, Any]: """ Get articles from a specific stream (feed, category, or reading list). """ return await _fetch_articles(stream_id, count, sort_order, exclude_target, continuation) @mcp.tool() @authenticated_tool async def search_articles(query: str, count: int = 20, search_in_stream: str = "user/-/state/com.google/reading-list") -> Dict[str, Any]: """ Search for articles containing specific keywords by paginating through a stream. """ query_lower = query.lower() matching_articles = [] continuation = None while len(matching_articles) < count: # Call the internal fetch function directly page_result = await _fetch_articles( stream_id=search_in_stream, count=100, # Fetch a larger page size to reduce round trips sort_order='d', exclude_target=None, continuation=continuation ) if "error" in page_result or not page_result.get("articles"): break # Stop if there's an error or no more articles for article in page_result["articles"]: if query_lower in article.get("title", "").lower() or query_lower in article.get("summary", "").lower(): matching_articles.append(article) if len(matching_articles) >= count: break continuation = page_result.get("continuation") if not continuation: break # No more pages return { "query": query, "match_count": len(matching_articles), "articles": matching_articles } @mcp.tool() @authenticated_tool async def mark_article_read(article_id: str) -> Dict[str, Any]: """ Mark a specific article as read. """ data = {"i": article_id, "a": "user/-/state/com.google/read"} result = await _api_request("post", '/api/greader.php/reader/api/0/edit-tag', data=data, requires_token=True) if "error" not in result: return {"success": True, "message": f"Article {article_id} marked as read"} return result @mcp.tool() @authenticated_tool async def mark_article_starred(article_id: str, starred: bool = True) -> Dict[str, Any]: """ Mark or unmark an article as starred. """ tag = "user/-/state/com.google/starred" data = {"i": article_id} if starred: data["a"] = tag else: data["r"] = tag result = await _api_request("post", '/api/greader.php/reader/api/0/edit-tag', data=data, requires_token=True) if "error" not in result: action = "starred" if starred else "unstarred" return {"success": True, "message": f"Article {article_id} {action}"} return result @mcp.tool() @authenticated_tool async def get_starred_articles(count: int = 20) -> Dict[str, Any]: """ Get all starred articles. """ return await get_articles(stream_id="user/-/state/com.google/starred", count=count) @mcp.tool() @authenticated_tool async def mark_all_as_read(stream_id: str) -> Dict[str, Any]: """ Mark all articles in a stream as read. """ data = {"s": stream_id} result = await _api_request("post", '/api/greader.php/reader/api/0/mark-all-as-read', data=data, requires_token=True) if "error" not in result: return {"success": True, "message": f"All articles in {stream_id} marked as read"} return result @mcp.tool() @authenticated_tool async def list_categories() -> Dict[str, Any]: """ List all available categories/tags. """ data = await _api_request("get", '/api/greader.php/reader/api/0/tag/list') if "tags" in data: return { "count": len(data["tags"]), "categories": [{ "id": tag.get("id", ""), "type": tag.get("type", ""), "unread_count": tag.get("unread_count", 0) } for tag in data["tags"]] } return data # --- Server Initialization and Shutdown --- async def ensure_authenticated(force: bool = False): """Ensure we're authenticated before making API calls, optionally forcing re-authentication.""" if force or not config.auth_token: # Use global variables for credentials if not FRESHRSS_URL or not FRESHRSS_USER or not FRESHRSS_PASSWORD: raise Exception("Missing credentials for authentication. Please provide them via arguments or environment variables.") logger.info(f"{'Re-authenticating' if force else 'Authenticating'} with FreshRSS server at {FRESHRSS_URL}...") if not await config.authenticate(FRESHRSS_URL, FRESHRSS_USER, FRESHRSS_PASSWORD): raise Exception("Failed to authenticate. Please check your credentials and server URL.") async def close_session(): """Gracefully close the aiohttp session.""" if config.session and not config.session.closed: await config.session.close() logger.info("Aiohttp session closed.") def main(): """Main function to initialize and run the MCP server.""" parser = argparse.ArgumentParser(description='FreshRSS MCP Server') parser.add_argument('--url', help='FreshRSS server URL') parser.add_argument('--email', help='Username for authentication') parser.add_argument('--password', help='Password for authentication') # Use parse_known_args to avoid conflicts with mcpo's arguments args, _ = parser.parse_known_args() # Set global variables for credentials global FRESHRSS_URL, FRESHRSS_USER, FRESHRSS_PASSWORD FRESHRSS_URL = args.url or os.getenv('FRESHRSS_URL') FRESHRSS_USER = args.email or os.getenv('FRESHRSS_EMAIL') FRESHRSS_PASSWORD = args.password or os.getenv('FRESHRSS_PASSWORD') if not all([FRESHRSS_URL, FRESHRSS_USER, FRESHRSS_PASSWORD]): logger.warning("Server starting without initial credentials. They must be provided before using tools.") logger.info("Starting FreshRSS MCP server...") try: mcp.run() finally: # Ensure the session is closed on exit try: loop = asyncio.get_running_loop() if loop.is_running(): loop.create_task(close_session()) else: asyncio.run(close_session()) except RuntimeError: # No running loop pass if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jeromewoody/freshrss-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server