main.py•12.5 kB
#!/usr/bin/env python3
"""Beeper MCP Server - Read-only access to Beeper messages."""
import asyncio
import json
import logging
import sys
from pathlib import Path
from typing import Any, Dict, List
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from beeper_reader import BeeperReader
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class BeeperMCPServer:
"""MCP server for Beeper message access."""
def __init__(self):
self.server = Server("beeper-mcp")
self.config = self._load_config()
self.reader = BeeperReader(self.config)
# Register tools
self._register_tools()
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from file or use defaults."""
config_path = Path(__file__).parent / "config.json"
if config_path.exists():
try:
with open(config_path, 'r') as f:
config = json.load(f)
logger.info("Loaded configuration from config.json")
return config
except Exception as e:
logger.error(f"Error loading config: {e}, using defaults")
# Default configuration
return {
"database_paths": [
"~/Library/Application Support/Beeper",
"~/.config/Beeper",
"~/Library/Application Support/Element",
"~/.config/Element"
],
"max_results": 50,
"log_level": "INFO"
}
def _register_tools(self):
"""Register MCP tools."""
@self.server.list_tools()
async def list_tools() -> List[Tool]:
return [
Tool(
name="list_conversations",
description="List recent conversations with metadata",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Maximum number of conversations to return",
"default": 20,
"minimum": 1,
"maximum": self.config.get('max_results', 50)
}
}
}
),
Tool(
name="read_messages",
description="Read recent messages from a specific conversation",
inputSchema={
"type": "object",
"properties": {
"conversation_id": {
"type": "string",
"description": "ID of the conversation to read messages from"
},
"limit": {
"type": "integer",
"description": "Maximum number of messages to return",
"default": 20,
"minimum": 1,
"maximum": self.config.get('max_results', 50)
}
},
"required": ["conversation_id"]
}
),
Tool(
name="search_messages",
description="Search for messages containing specific text across all conversations",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query text"
},
"limit": {
"type": "integer",
"description": "Maximum number of results to return",
"default": 20,
"minimum": 1,
"maximum": self.config.get('max_results', 50)
}
},
"required": ["query"]
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
try:
if name == "list_conversations":
return await self._list_conversations(arguments)
elif name == "read_messages":
return await self._read_messages(arguments)
elif name == "search_messages":
return await self._search_messages(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Error calling tool {name}: {e}")
return [TextContent(
type="text",
text=f"Error: {str(e)}"
)]
async def _list_conversations(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle list_conversations tool call."""
limit = arguments.get('limit', 20)
try:
conversations = await asyncio.to_thread(
self.reader.list_conversations, limit
)
if not conversations:
return [TextContent(
type="text",
text="No conversations found. Please ensure Beeper is installed and has been used."
)]
# Format output
output_lines = [f"Found {len(conversations)} conversation(s):\n"]
for i, conv in enumerate(conversations, 1):
lines = [
f"{i}. {conv['title']}",
f" ID: {conv['id']}",
f" Source: {conv.get('source', 'unknown')}"
]
if conv.get('last_activity'):
lines.append(f" Last activity: {conv['last_activity']}")
if conv.get('participant_count'):
lines.append(f" Participants: {conv['participant_count']}")
if conv.get('last_message_preview'):
preview = conv['last_message_preview']
if len(preview) > 50:
preview = preview[:50] + "..."
lines.append(f" Last message: {preview}")
output_lines.extend(lines)
output_lines.append("") # Empty line between conversations
return [TextContent(
type="text",
text="\n".join(output_lines)
)]
except Exception as e:
logger.error(f"Error listing conversations: {e}")
return [TextContent(
type="text",
text=f"Error listing conversations: {str(e)}"
)]
async def _read_messages(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle read_messages tool call."""
conversation_id = arguments.get('conversation_id')
limit = arguments.get('limit', 20)
if not conversation_id:
return [TextContent(
type="text",
text="Error: conversation_id is required"
)]
try:
messages = await asyncio.to_thread(
self.reader.read_messages, conversation_id, limit
)
if not messages:
return [TextContent(
type="text",
text=f"No messages found in conversation '{conversation_id}'"
)]
# Format output
output_lines = [f"Messages from conversation '{conversation_id}':\n"]
for msg in messages:
timestamp = msg.get('timestamp', 'Unknown time')
sender = msg.get('sender', 'Unknown sender')
content = msg.get('content', '')
# Format message
output_lines.append(f"[{timestamp}] {sender}:")
output_lines.append(f" {content}")
output_lines.append("") # Empty line between messages
return [TextContent(
type="text",
text="\n".join(output_lines)
)]
except Exception as e:
logger.error(f"Error reading messages: {e}")
return [TextContent(
type="text",
text=f"Error reading messages: {str(e)}"
)]
async def _search_messages(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle search_messages tool call."""
query = arguments.get('query')
limit = arguments.get('limit', 20)
if not query:
return [TextContent(
type="text",
text="Error: query is required"
)]
try:
results = await asyncio.to_thread(
self.reader.search_messages, query, limit
)
if not results:
return [TextContent(
type="text",
text=f"No messages found matching '{query}'"
)]
# Format output
output_lines = [f"Found {len(results)} message(s) matching '{query}':\n"]
for i, msg in enumerate(results, 1):
conv_title = msg.get('conversation_title', 'Unknown conversation')
timestamp = msg.get('timestamp', 'Unknown time')
sender = msg.get('sender', 'Unknown sender')
content = msg.get('content', '')
# Highlight search term in content (simple approach)
if query.lower() in content.lower():
start = content.lower().find(query.lower())
end = start + len(query)
# Show context around match
context_start = max(0, start - 30)
context_end = min(len(content), end + 30)
snippet = content[context_start:context_end]
if context_start > 0:
snippet = "..." + snippet
if context_end < len(content):
snippet = snippet + "..."
else:
snippet = content[:80] + "..." if len(content) > 80 else content
output_lines.extend([
f"{i}. In '{conv_title}'",
f" [{timestamp}] {sender}:",
f" {snippet}",
""
])
return [TextContent(
type="text",
text="\n".join(output_lines)
)]
except Exception as e:
logger.error(f"Error searching messages: {e}")
return [TextContent(
type="text",
text=f"Error searching messages: {str(e)}"
)]
async def run(self):
"""Run the MCP server."""
try:
logger.info("Starting Beeper MCP server...")
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error(f"Server error: {e}")
raise
finally:
self.reader.close()
async def main():
"""Main entry point."""
server = BeeperMCPServer()
await server.run()
if __name__ == "__main__":
asyncio.run(main())