#!/usr/bin/env python3
"""
MCP Server for Lenny RAG.
Provides four tools for hierarchical retrieval:
1. search_lenny - Semantic search across insights and examples
2. get_chapter - Load a specific topic with full context
3. get_full_transcript - Load entire episode transcript
4. list_episodes - Browse available episodes
"""
import asyncio
import json
from typing import Any
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool
from .retrieval import get_retriever
# Create MCP server
server = Server("lenny-rag")
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools."""
return [
Tool(
name="search_lenny",
description=(
"Search across Lenny Rachitsky podcast transcripts for insights, "
"examples, and topics. Returns relevant results with pointers for "
"deeper exploration. Use this for finding product development insights, "
"growth strategies, leadership advice, and real-world examples."
),
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query (e.g., 'pricing B2B products', 'founder mode', 'growth experiments')"
},
"top_k": {
"type": "integer",
"description": "Number of results to return (default: 5, max: 20)",
"default": 5
},
"type_filter": {
"type": "string",
"enum": ["insight", "example", "topic", "episode"],
"description": "Optional: filter results by type"
}
},
"required": ["query"]
}
),
Tool(
name="get_chapter",
description=(
"Load a specific chapter/topic from an episode with full context. "
"Returns the topic summary, all insights, examples, and the raw "
"transcript segment. Use this after search_lenny to dive deeper "
"into a specific topic."
),
inputSchema={
"type": "object",
"properties": {
"episode": {
"type": "string",
"description": "Episode filename (e.g., 'Brian Chesky.txt')"
},
"topic_id": {
"type": "string",
"description": "Topic ID (e.g., 'topic_1')"
}
},
"required": ["episode", "topic_id"]
}
),
Tool(
name="get_full_transcript",
description=(
"Load the complete transcript for an episode. Returns metadata, "
"topic list, and full transcript text. Use this when you need "
"the complete context of an episode. Note: transcripts can be "
"10-40K tokens long."
),
inputSchema={
"type": "object",
"properties": {
"episode": {
"type": "string",
"description": "Episode filename (e.g., 'Brian Chesky.txt')"
}
},
"required": ["episode"]
}
),
Tool(
name="list_episodes",
description=(
"List all available Lenny podcast episodes. Optionally filter by "
"expertise area. Use this to discover what episodes are available "
"or find episodes by a specific expert type."
),
inputSchema={
"type": "object",
"properties": {
"expertise_filter": {
"type": "string",
"description": "Optional: filter by expertise tag (e.g., 'growth', 'pricing', 'AI')"
}
}
}
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
"""Handle tool calls."""
retriever = get_retriever()
if name == "search_lenny":
query = arguments.get("query", "")
top_k = min(arguments.get("top_k", 5), 20)
type_filter = arguments.get("type_filter")
results = retriever.search(query, top_k=top_k, type_filter=type_filter)
# Format results for display
output_parts = [f"Found {len(results)} results for: '{query}'\n"]
for i, result in enumerate(results, 1):
output_parts.append(f"\n--- Result {i} ---")
output_parts.append(f"Type: {result['type']}")
output_parts.append(f"Episode: {result['episode_file']}")
output_parts.append(f"Relevance: {result['relevance_score']:.2f}")
if result["type"] == "episode":
output_parts.append(f"Guest: {result.get('guest', 'Unknown')}")
if result.get("expertise_tags"):
output_parts.append(f"Expertise: {', '.join(result['expertise_tags'])}")
elif result["type"] == "topic":
output_parts.append(f"Topic: {result.get('title', '')}")
output_parts.append(f"Topic ID: {result.get('topic_id', '')}")
output_parts.append(f"→ Use get_chapter(episode='{result['episode_file']}', topic_id='{result.get('topic_id', '')}') for full context")
elif result["type"] == "insight":
output_parts.append(f"Insight: {result.get('text', '')}")
output_parts.append(f"Topic ID: {result.get('topic_id', '')}")
output_parts.append(f"→ Use get_chapter(episode='{result['episode_file']}', topic_id='{result.get('topic_id', '')}') for full context")
elif result["type"] == "example":
output_parts.append(f"Example: {result.get('explicit_text', '')[:200]}...")
if result.get("inferred_identity"):
output_parts.append(f"Company/Product: {result['inferred_identity']} (confidence: {result.get('confidence', 'medium')})")
if result.get("lesson"):
output_parts.append(f"Lesson: {result['lesson']}")
if result.get("tags"):
output_parts.append(f"Tags: {', '.join(result['tags'])}")
output_parts.append(f"Topic ID: {result.get('topic_id', '')}")
output_parts.append(f"→ Use get_chapter(episode='{result['episode_file']}', topic_id='{result.get('topic_id', '')}') for full context")
return [TextContent(type="text", text="\n".join(output_parts))]
elif name == "get_chapter":
episode = arguments.get("episode", "")
topic_id = arguments.get("topic_id", "")
chapter = retriever.get_chapter(episode, topic_id)
if chapter is None:
return [TextContent(
type="text",
text=f"Chapter not found: episode='{episode}', topic_id='{topic_id}'"
)]
# Format chapter data
output_parts = [f"# Chapter: {chapter['topic'].get('title', 'Unknown')}\n"]
output_parts.append(f"Episode: {episode}")
output_parts.append(f"Topic ID: {topic_id}")
if chapter["topic"].get("summary"):
output_parts.append(f"\n## Summary\n{chapter['topic']['summary']}")
if chapter["insights"]:
output_parts.append(f"\n## Insights ({len(chapter['insights'])})")
for insight in chapter["insights"]:
output_parts.append(f"\n- {insight.get('text', '')}")
if insight.get("context"):
output_parts.append(f" Context: {insight['context']}")
if chapter["examples"]:
output_parts.append(f"\n## Examples ({len(chapter['examples'])})")
for example in chapter["examples"]:
output_parts.append(f"\n- {example.get('explicit_text', '')}")
if example.get("inferred_identity"):
output_parts.append(f" (Likely: {example['inferred_identity']})")
if example.get("lesson"):
output_parts.append(f" Lesson: {example['lesson']}")
if chapter["transcript_segment"]:
output_parts.append(f"\n## Raw Transcript (lines {chapter['topic'].get('line_start', '?')}-{chapter['topic'].get('line_end', '?')})")
output_parts.append(f"\n{chapter['transcript_segment']}")
output_parts.append(f"\n---\n→ Use get_full_transcript(episode='{episode}') for complete episode")
return [TextContent(type="text", text="\n".join(output_parts))]
elif name == "get_full_transcript":
episode = arguments.get("episode", "")
data = retriever.get_full_transcript(episode)
if data is None:
return [TextContent(
type="text",
text=f"Episode not found: '{episode}'"
)]
# Format full transcript
output_parts = [f"# Full Transcript: {episode}\n"]
if data.get("metadata"):
meta = data["metadata"]
if meta.get("guest"):
output_parts.append(f"Guest: {meta['guest']}")
if meta.get("expertise_tags"):
output_parts.append(f"Expertise: {', '.join(meta['expertise_tags'])}")
if meta.get("summary"):
output_parts.append(f"\n## Summary\n{meta['summary']}")
if meta.get("key_frameworks"):
output_parts.append(f"\nKey Frameworks: {', '.join(meta['key_frameworks'])}")
if data.get("topics"):
output_parts.append(f"\n## Topics ({len(data['topics'])})")
for topic in data["topics"]:
output_parts.append(f"- [{topic.get('id', '?')}] {topic.get('title', 'Unknown')}")
output_parts.append(f"\n## Full Transcript")
output_parts.append(f"\n{data['transcript']}")
return [TextContent(type="text", text="\n".join(output_parts))]
elif name == "list_episodes":
expertise_filter = arguments.get("expertise_filter")
episodes = retriever.list_episodes(expertise_filter=expertise_filter)
# Format episode list
if expertise_filter:
output_parts = [f"# Episodes filtered by '{expertise_filter}'\n"]
else:
output_parts = [f"# All Available Episodes ({len(episodes)} total)\n"]
for ep in episodes:
status = "✓" if ep.get("preprocessed") else "○"
line = f"{status} {ep['filename']}"
if ep.get("expertise_tags"):
line += f" [{', '.join(ep['expertise_tags'][:3])}]"
output_parts.append(line)
output_parts.append(f"\n---")
output_parts.append(f"✓ = preprocessed (searchable)")
output_parts.append(f"○ = transcript only")
output_parts.append(f"\nUse search_lenny() to search across all preprocessed episodes")
return [TextContent(type="text", text="\n".join(output_parts))]
else:
return [TextContent(
type="text",
text=f"Unknown tool: {name}"
)]
async def run_server():
"""Run the MCP server."""
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
def main():
"""Entry point for the lenny-server command."""
asyncio.run(run_server())
if __name__ == "__main__":
main()