#!/usr/bin/env python3
"""
Substack Reader MCP Server
An MCP server for reading Substack subscriptions and posts.
Provides tools to list subscriptions, fetch recent posts, and read content.
"""
import asyncio
import json
import os
from pathlib import Path
from typing import Any
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# Import substack_api components
from substack_api import User, Newsletter, Post, SubstackAuth
# Configuration
COOKIES_PATH = os.environ.get(
"SUBSTACK_COOKIES_PATH",
str(Path.home() / ".config" / "substack-reader" / "cookies.json")
)
USERNAME = os.environ.get("SUBSTACK_USERNAME", "")
def get_auth() -> SubstackAuth | None:
"""Load authentication from cookies file if available."""
if Path(COOKIES_PATH).exists():
return SubstackAuth(cookies_path=COOKIES_PATH)
return None
def get_subscriptions(username: str) -> list[dict]:
"""Get list of subscribed newsletters for a user."""
user = User(username)
subs = user.get_subscriptions()
# Normalize the response format
return [
{
"name": sub.get("publication_name", "Unknown"),
"url": f"https://{sub.get('domain', '')}",
"domain": sub.get("domain", ""),
"publication_id": sub.get("publication_id"),
}
for sub in subs
]
def get_newsletter_posts(newsletter_url: str, limit: int = 10) -> list[dict]:
"""Get recent posts from a newsletter."""
auth = get_auth()
newsletter = Newsletter(newsletter_url, auth=auth)
posts = newsletter.get_posts(limit=limit)
result = []
for p in posts:
metadata = p.get_metadata()
result.append({
"title": metadata.get("title", "Unknown"),
"subtitle": metadata.get("subtitle", ""),
"url": p.url,
"date": metadata.get("post_date", ""),
"is_paywalled": p.is_paywalled() if hasattr(p, "is_paywalled") else False,
})
return result
def get_post_content(post_url: str) -> dict:
"""Get full content of a post."""
auth = get_auth()
post = Post(post_url, auth=auth)
metadata = post.get_metadata()
content = post.get_content()
return {
"title": metadata.get("title", ""),
"subtitle": metadata.get("subtitle", ""),
"author": metadata.get("author", {}).get("name", ""),
"date": metadata.get("post_date", ""),
"content": content,
"is_paywalled": post.is_paywalled() if hasattr(post, "is_paywalled") else False,
}
def search_newsletter(newsletter_url: str, query: str, limit: int = 5) -> list[dict]:
"""Search for posts within a newsletter."""
auth = get_auth()
newsletter = Newsletter(newsletter_url, auth=auth)
results = newsletter.search_posts(query, limit=limit)
result = []
for p in results:
metadata = p.get_metadata()
result.append({
"title": metadata.get("title", "Unknown"),
"subtitle": metadata.get("subtitle", ""),
"url": p.url,
})
return result
# Create MCP server
server = Server("substack-reader")
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools."""
return [
Tool(
name="list_subscriptions",
description="List all newsletters you're subscribed to. Requires SUBSTACK_USERNAME env var.",
inputSchema={
"type": "object",
"properties": {
"username": {
"type": "string",
"description": "Substack username (defaults to SUBSTACK_USERNAME env var)",
}
},
},
),
Tool(
name="get_recent_posts",
description="Get recent posts from a specific newsletter",
inputSchema={
"type": "object",
"properties": {
"newsletter_url": {
"type": "string",
"description": "Newsletter URL (e.g., https://example.substack.com)",
},
"limit": {
"type": "integer",
"description": "Number of posts to fetch (default: 10)",
"default": 10,
},
},
"required": ["newsletter_url"],
},
),
Tool(
name="read_post",
description="Read the full content of a Substack post",
inputSchema={
"type": "object",
"properties": {
"post_url": {
"type": "string",
"description": "Full URL of the post",
},
},
"required": ["post_url"],
},
),
Tool(
name="search_newsletter",
description="Search for posts within a specific newsletter",
inputSchema={
"type": "object",
"properties": {
"newsletter_url": {
"type": "string",
"description": "Newsletter URL to search",
},
"query": {
"type": "string",
"description": "Search query",
},
"limit": {
"type": "integer",
"description": "Max results (default: 5)",
"default": 5,
},
},
"required": ["newsletter_url", "query"],
},
),
Tool(
name="get_feed",
description="Get recent posts from all your subscribed newsletters",
inputSchema={
"type": "object",
"properties": {
"username": {
"type": "string",
"description": "Substack username",
},
"posts_per_newsletter": {
"type": "integer",
"description": "Posts to fetch per newsletter (default: 3)",
"default": 3,
},
},
},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
"""Handle tool calls."""
try:
if name == "list_subscriptions":
username = arguments.get("username") or USERNAME
if not username:
return [TextContent(
type="text",
text="Error: No username provided. Set SUBSTACK_USERNAME env var or pass username argument."
)]
subscriptions = get_subscriptions(username)
return [TextContent(
type="text",
text=json.dumps(subscriptions, indent=2)
)]
elif name == "get_recent_posts":
newsletter_url = arguments["newsletter_url"]
limit = arguments.get("limit", 10)
posts = get_newsletter_posts(newsletter_url, limit)
return [TextContent(
type="text",
text=json.dumps(posts, indent=2)
)]
elif name == "read_post":
post_url = arguments["post_url"]
content = get_post_content(post_url)
return [TextContent(
type="text",
text=json.dumps(content, indent=2)
)]
elif name == "search_newsletter":
newsletter_url = arguments["newsletter_url"]
query = arguments["query"]
limit = arguments.get("limit", 5)
results = search_newsletter(newsletter_url, query, limit)
return [TextContent(
type="text",
text=json.dumps(results, indent=2)
)]
elif name == "get_feed":
username = arguments.get("username") or USERNAME
posts_per = arguments.get("posts_per_newsletter", 3)
if not username:
return [TextContent(
type="text",
text="Error: No username provided."
)]
subscriptions = get_subscriptions(username)
feed = []
for sub in subscriptions[:20]: # Limit to 20 newsletters
try:
url = sub.get("url") or sub.get("base_url")
if url:
posts = get_newsletter_posts(url, posts_per)
for post in posts:
post["newsletter"] = sub.get("name", url)
feed.append(post)
except Exception as e:
continue # Skip newsletters that fail
# Sort by date (newest first)
feed.sort(key=lambda x: x.get("date", ""), reverse=True)
return [TextContent(
type="text",
text=json.dumps(feed[:30], indent=2) # Return top 30
)]
else:
return [TextContent(
type="text",
text=f"Unknown tool: {name}"
)]
except Exception as e:
return [TextContent(
type="text",
text=f"Error: {str(e)}"
)]
async def main():
"""Run the MCP server."""
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
if __name__ == "__main__":
asyncio.run(main())