#!/usr/bin/env python3
# Copyright (c) 2026 ngpestelos
# Licensed under the MIT License - see LICENSE file for details
"""
Minimal Python Readwise MCP Server
Token-efficient, single-file implementation using FastMCP
"""
import json
import os
import re
import sys
import urllib.parse
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Dict, List, Tuple
import requests
import yaml
from mcp.server.fastmcp import FastMCP
# Configure logging to stderr
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stderr
)
logger = logging.getLogger(__name__)
# Configuration from environment variables
READWISE_TOKEN = os.environ.get("READWISE_TOKEN")
VAULT_PATH = Path(os.environ.get("VAULT_PATH", "/Users/ngpestelos/src/PARA"))
STATE_FILE = VAULT_PATH / ".claude/state/readwise-import.json"
DOCUMENTS_DIR = VAULT_PATH / "2 Resources/Readwise/Documents"
DAILY_REVIEWS_DIR = VAULT_PATH / "2 Resources/Readwise/Daily Reviews"
ARCHIVES_DIR = VAULT_PATH / "3 Archives/Readwise"
# Validate configuration (only when running as main)
def validate_config():
if not READWISE_TOKEN:
logger.error("READWISE_TOKEN environment variable not set")
sys.exit(1)
# ============================================================================
# UTILITY FUNCTIONS (reused from backfill.py)
# ============================================================================
def load_state() -> Dict:
"""Load state file or create default"""
if STATE_FILE.exists():
with open(STATE_FILE, 'r') as f:
return json.load(f)
return {
"last_import_timestamp": datetime.now(timezone.utc).isoformat(),
"synced_ranges": [],
"backfill_in_progress": False
}
def write_state(state: Dict) -> None:
"""Write state file"""
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
def optimize_backfill(target_date: str, synced_ranges: List[Dict]) -> Tuple[bool, Optional[str]]:
"""
Check synced_ranges before pagination to skip already-synced content.
Returns:
(should_proceed, optimized_updated_after)
"""
if not synced_ranges:
return (True, None)
# Convert target date to timestamp (timezone-aware)
target_ts = datetime.fromisoformat(target_date + "T00:00:00+00:00")
# Sort ranges by start timestamp
ranges = sorted(synced_ranges, key=lambda r: r['start'])
for range_item in ranges:
range_start = datetime.fromisoformat(range_item['start'].replace('Z', '+00:00'))
range_end = datetime.fromisoformat(range_item['end'].replace('Z', '+00:00'))
# Case 1: Target date falls within synced range
if range_start <= target_ts <= range_end:
logger.info(f"Target date {target_date} already synced (range: {range_item['start']} to {range_item['end']})")
return (False, None) # Skip - already synced
# Case 2: Target date is before synced range
if target_ts < range_start:
# Gap exists between target and synced range start
# Don't use updatedAfter - we need to fill the gap
# Pagination will stop when hitting target date
# Deduplication will handle overlap with synced range
logger.info(f"Gap detected: target {target_date} is before synced range {range_item['start']}")
logger.info(f"Will paginate to fill gap (no updatedAfter filter)")
return (True, None) # No filter - fill the gap
# Case 3: Target date is after all ranges
return (True, None)
def scan_existing_documents() -> Tuple[set, set]:
"""Scan filesystem to build known IDs and filenames"""
known_ids = set()
known_filenames = set()
for directory in [DOCUMENTS_DIR, ARCHIVES_DIR, DAILY_REVIEWS_DIR]:
if not directory.exists():
continue
for filepath in directory.glob("*.md"):
# Track filename
known_filenames.add(filepath.name)
# Extract ID from frontmatter if present
try:
with open(filepath, 'r') as f:
content = f.read()
# Extract readwise_url from YAML frontmatter
match = re.search(r'^readwise_url:\s*"?([^"\n]+)"?', content, re.MULTILINE)
if match:
url = match.group(1)
# Extract ID from URL (last path segment)
doc_id = url.rstrip('/').split('/')[-1]
known_ids.add(doc_id)
except Exception as e:
pass # Skip files with read errors
return known_ids, known_filenames
def sanitize_filename(title: str, doc: Optional[Dict] = None) -> str:
"""
Sanitize title for filename with fallback for invalid names.
Args:
title: The document title to sanitize
doc: Optional document dict for fallback metadata (author, saved_at)
Returns:
Sanitized filename ending in .md
"""
# Replace special characters
filename = title.replace('/', '-').replace(':', ' -')
# Remove invalid characters
filename = re.sub(r'[<>"\\\|?*]', '', filename)
# Trim to 100 characters
filename = filename[:100].strip()
# Check if filename has at least one alphanumeric character
if not any(c.isalnum() for c in filename):
# Fallback: use author + date or generic name
if doc:
author = doc.get('author', 'Unknown')
# Sanitize author name
author = re.sub(r'[<>"\\\|?*/:]', '', author)[:30].strip()
saved_at = doc.get('saved_at', '')
date_str = saved_at[:10] if saved_at else datetime.now().strftime('%Y-%m-%d')
# Use category to make name more descriptive
category = doc.get('category', 'Document')
category_label = 'Tweet' if category == 'tweet' else category.capitalize()
filename = f"{category_label} by {author} - {date_str}"
else:
# Generic fallback
filename = f"Untitled - {datetime.now().strftime('%Y-%m-%d-%H%M%S')}"
return filename + ".md"
def extract_id_from_url(url: Optional[str]) -> Optional[str]:
"""Extract document ID from Readwise URL"""
if not url:
return None
return url.rstrip('/').split('/')[-1]
# ============================================================================
# NEW FUNCTIONS (for MCP server)
# ============================================================================
def fetch_api(endpoint: str, params: Optional[Dict] = None) -> Dict:
"""Make authenticated API call to Readwise"""
base_url = "https://readwise.io/api/v3"
url = f"{base_url}{endpoint}"
headers = {
"Authorization": f"Token {READWISE_TOKEN}"
}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
def format_document_markdown(doc: Dict) -> str:
"""Convert API document to markdown with YAML frontmatter"""
# Build frontmatter
frontmatter = {
"title": doc.get("title", "Untitled"),
"author": doc.get("author"),
"source": doc.get("source"),
"category": doc.get("category"),
"saved_at": doc.get("saved_at"),
"updated_at": doc.get("updated_at"),
"readwise_url": doc.get("readwise_url"),
"source_url": doc.get("source_url"),
"tags": doc.get("tags", [])
}
# Remove None values
frontmatter = {k: v for k, v in frontmatter.items() if v is not None}
# Build markdown
yaml_str = yaml.dump(frontmatter, allow_unicode=True, default_flow_style=False)
content = doc.get("content", "")
summary = doc.get("summary", "")
notes = doc.get("notes", "")
markdown = f"---\n{yaml_str}---\n\n"
if summary:
markdown += f"## Summary\n\n{summary}\n\n"
if content:
markdown += f"## Content\n\n{content}\n\n"
if notes:
markdown += f"## Notes\n\n{notes}\n\n"
return markdown
def save_document(doc: Dict, directory: Path) -> Path:
"""Save document as markdown file"""
directory.mkdir(parents=True, exist_ok=True)
filename = sanitize_filename(doc.get("title", ""), doc)
filepath = directory / filename
# Handle filename collisions
counter = 1
while filepath.exists():
name_without_ext = filename[:-3] # Remove .md
filepath = directory / f"{name_without_ext} ({counter}).md"
counter += 1
markdown = format_document_markdown(doc)
with open(filepath, 'w') as f:
f.write(markdown)
return filepath
# ============================================================================
# MCP SERVER INITIALIZATION
# ============================================================================
mcp = FastMCP("readwise")
# ============================================================================
# MCP TOOLS (8 essential tools)
# ============================================================================
@mcp.tool()
async def readwise_daily_review() -> dict:
"""Fetch today's highlights and save to Daily Reviews directory"""
try:
# Get today's date
today = datetime.now(timezone.utc).date()
today_str = today.isoformat()
# Fetch highlights API (using highlights endpoint for daily review)
data = fetch_api("/highlights/", params={"limit": 50})
highlights = data.get("results", [])
if not highlights:
return {"status": "no_highlights", "count": 0}
# Create daily review file
filename = f"{today_str}.md"
filepath = DAILY_REVIEWS_DIR / filename
DAILY_REVIEWS_DIR.mkdir(parents=True, exist_ok=True)
# Format content
content = f"# Daily Review - {today_str}\n\n"
for highlight in highlights:
content += f"## {highlight.get('text', '')}\n\n"
if highlight.get('note'):
content += f"**Note**: {highlight['note']}\n\n"
content += f"**Source**: {highlight.get('source_url', 'Unknown')}\n\n---\n\n"
with open(filepath, 'w') as f:
f.write(content)
return {
"status": "success",
"count": len(highlights),
"file": str(filepath)
}
except Exception as e:
logger.error(f"Error in daily review: {e}")
return {"status": "error", "message": str(e)}
@mcp.tool()
async def readwise_import_recent(category: str = "tweet", limit: int = 20) -> dict:
"""Import recent documents since last import with deduplication"""
try:
# Load state
state = load_state()
last_import = state.get("last_import_timestamp")
# Scan existing documents
known_ids, known_filenames = scan_existing_documents()
# Build API params
params = {"category": category, "limit": limit}
if last_import:
params["updatedAfter"] = last_import
# Fetch documents
data = fetch_api("/list/", params=params)
results = data.get("results", [])
imported = 0
skipped = 0
for doc in results:
# Check deduplication
doc_id = extract_id_from_url(doc.get("readwise_url"))
filename = sanitize_filename(doc.get("title", ""), doc)
if doc_id in known_ids or filename in known_filenames:
skipped += 1
continue
# Save document
save_document(doc, DOCUMENTS_DIR)
imported += 1
# Track for session deduplication
if doc_id:
known_ids.add(doc_id)
known_filenames.add(filename)
# Update state
if results:
state["last_import_timestamp"] = datetime.now(timezone.utc).isoformat()
write_state(state)
return {
"status": "success",
"imported": imported,
"skipped": skipped,
"total_analyzed": len(results)
}
except Exception as e:
logger.error(f"Error importing recent: {e}")
return {"status": "error", "message": str(e)}
@mcp.tool()
async def readwise_backfill(target_date: str, category: str = "tweet") -> dict:
"""Paginate to target date with synced range optimization"""
try:
# Load state
state = load_state()
synced_ranges = state.get("synced_ranges", [])
# Check optimization
should_proceed, optimized_after = optimize_backfill(target_date, synced_ranges)
if not should_proceed:
return {
"status": "already_synced",
"message": f"Target date {target_date} already synced",
"imported": 0,
"skipped": 0
}
# Scan existing documents
known_ids, known_filenames = scan_existing_documents()
# Build initial params
params = {"category": category, "limit": 50}
if optimized_after:
params["updatedAfter"] = optimized_after
# Pagination loop
cursor = None
imported = 0
skipped = 0
page_num = 0
target_dt = datetime.strptime(target_date, "%Y-%m-%d")
reached_target = False
while not reached_target and page_num < 100: # Safety limit
page_num += 1
# Update params with cursor
if cursor:
params["pageCursor"] = cursor
# Fetch page
data = fetch_api("/list/", params=params)
results = data.get("results", [])
if not results:
break
# Process documents
for doc in results:
doc_date = datetime.fromisoformat(doc["saved_at"].replace('Z', '+00:00'))
# Check if reached target
if doc_date.date() < target_dt.date():
reached_target = True
break
# Deduplicate
doc_id = extract_id_from_url(doc.get("readwise_url"))
filename = sanitize_filename(doc.get("title", ""), doc)
if doc_id in known_ids or filename in known_filenames:
skipped += 1
continue
# Save document
save_document(doc, DOCUMENTS_DIR)
imported += 1
# Track for session deduplication
if doc_id:
known_ids.add(doc_id)
known_filenames.add(filename)
if reached_target:
break
# Get next cursor
cursor = data.get("nextPageCursor")
if not cursor:
break
# Update state
state["last_import_timestamp"] = datetime.now(timezone.utc).isoformat()
write_state(state)
return {
"status": "success" if reached_target else "completed_all_pages",
"imported": imported,
"skipped": skipped,
"pages": page_num,
"reached_target": reached_target
}
except Exception as e:
logger.error(f"Error in backfill: {e}")
return {"status": "error", "message": str(e)}
@mcp.tool()
async def readwise_book_highlights(title: Optional[str] = None, book_id: Optional[str] = None) -> dict:
"""Get highlights for a specific book"""
try:
# Build params
params = {}
if book_id:
params["book_id"] = book_id
# Fetch highlights
data = fetch_api("/highlights/", params=params)
highlights = data.get("results", [])
# Filter by title if provided
if title:
highlights = [h for h in highlights if title.lower() in h.get("book_title", "").lower()]
return {
"status": "success",
"count": len(highlights),
"highlights": [
{
"text": h.get("text"),
"note": h.get("note"),
"book_title": h.get("book_title"),
"location": h.get("location")
}
for h in highlights[:50] # Limit to 50 for token efficiency
]
}
except Exception as e:
logger.error(f"Error fetching book highlights: {e}")
return {"status": "error", "message": str(e)}
@mcp.tool()
async def readwise_search_highlights(query: str, limit: int = 50) -> dict:
"""Search highlights by text query"""
try:
# Fetch highlights (API doesn't support search directly, so we fetch and filter)
data = fetch_api("/highlights/", params={"limit": 100})
highlights = data.get("results", [])
# Filter by query
query_lower = query.lower()
matching = [
h for h in highlights
if query_lower in h.get("text", "").lower() or
query_lower in h.get("note", "").lower()
]
return {
"status": "success",
"count": len(matching),
"highlights": [
{
"text": h.get("text"),
"note": h.get("note"),
"source": h.get("source_url"),
"created_at": h.get("created_at")
}
for h in matching[:limit]
]
}
except Exception as e:
logger.error(f"Error searching highlights: {e}")
return {"status": "error", "message": str(e)}
@mcp.tool()
async def readwise_state_info() -> dict:
"""Show current import state and synced ranges"""
try:
state = load_state()
# Scan filesystem for current count
known_ids, known_filenames = scan_existing_documents()
return {
"status": "success",
"last_import": state.get("last_import_timestamp"),
"oldest_imported": state.get("oldest_imported_date"),
"synced_ranges": state.get("synced_ranges", []),
"backfill_in_progress": state.get("backfill_in_progress", False),
"documents_on_disk": len(known_filenames),
"documents_with_ids": len(known_ids)
}
except Exception as e:
logger.error(f"Error getting state info: {e}")
return {"status": "error", "message": str(e)}
@mcp.tool()
async def readwise_init_ranges() -> dict:
"""Scan filesystem to build synced_ranges from existing documents"""
try:
# Scan all documents
docs_with_dates = []
for directory in [DOCUMENTS_DIR, ARCHIVES_DIR]:
if not directory.exists():
continue
for filepath in directory.glob("*.md"):
try:
with open(filepath, 'r') as f:
content = f.read()
# Extract saved_at from frontmatter
match = re.search(r'^saved_at:\s*"?([^"\n]+)"?', content, re.MULTILINE)
if match:
saved_at = match.group(1)
docs_with_dates.append(saved_at)
except Exception:
pass
if not docs_with_dates:
return {"status": "no_documents", "message": "No documents with dates found"}
# Sort dates
dates = sorted([datetime.fromisoformat(d.replace('Z', '+00:00')) for d in docs_with_dates])
# Build single range
synced_range = {
"start": dates[0].isoformat(),
"end": dates[-1].isoformat(),
"doc_count": len(docs_with_dates),
"verified_at": datetime.now(timezone.utc).isoformat()
}
# Update state
state = load_state()
state["synced_ranges"] = [synced_range]
state["oldest_imported_date"] = dates[0].strftime("%Y-%m-%d")
write_state(state)
return {
"status": "success",
"range": synced_range,
"documents_analyzed": len(docs_with_dates)
}
except Exception as e:
logger.error(f"Error initializing ranges: {e}")
return {"status": "error", "message": str(e)}
@mcp.tool()
async def readwise_reset_state(clear_ranges: bool = False) -> dict:
"""Clear state file (optionally preserve synced_ranges)"""
try:
if clear_ranges:
# Full reset
new_state = {
"last_import_timestamp": datetime.now(timezone.utc).isoformat(),
"synced_ranges": [],
"backfill_in_progress": False
}
else:
# Preserve ranges
state = load_state()
new_state = {
"last_import_timestamp": datetime.now(timezone.utc).isoformat(),
"synced_ranges": state.get("synced_ranges", []),
"backfill_in_progress": False
}
write_state(new_state)
return {
"status": "success",
"message": "State reset",
"cleared_ranges": clear_ranges
}
except Exception as e:
logger.error(f"Error resetting state: {e}")
return {"status": "error", "message": str(e)}
# ============================================================================
# SERVER STARTUP
# ============================================================================
if __name__ == "__main__":
validate_config()
logger.info("Starting Readwise MCP Server")
logger.info(f"Vault path: {VAULT_PATH}")
logger.info(f"Documents directory: {DOCUMENTS_DIR}")
mcp.run()