Skip to main content
Glama

Beeper MCP Server

by mimen
main.py12.5 kB
#!/usr/bin/env python3 """Beeper MCP Server - Read-only access to Beeper messages.""" import asyncio import json import logging import sys from pathlib import Path from typing import Any, Dict, List from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent from beeper_reader import BeeperReader # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class BeeperMCPServer: """MCP server for Beeper message access.""" def __init__(self): self.server = Server("beeper-mcp") self.config = self._load_config() self.reader = BeeperReader(self.config) # Register tools self._register_tools() def _load_config(self) -> Dict[str, Any]: """Load configuration from file or use defaults.""" config_path = Path(__file__).parent / "config.json" if config_path.exists(): try: with open(config_path, 'r') as f: config = json.load(f) logger.info("Loaded configuration from config.json") return config except Exception as e: logger.error(f"Error loading config: {e}, using defaults") # Default configuration return { "database_paths": [ "~/Library/Application Support/Beeper", "~/.config/Beeper", "~/Library/Application Support/Element", "~/.config/Element" ], "max_results": 50, "log_level": "INFO" } def _register_tools(self): """Register MCP tools.""" @self.server.list_tools() async def list_tools() -> List[Tool]: return [ Tool( name="list_conversations", description="List recent conversations with metadata", inputSchema={ "type": "object", "properties": { "limit": { "type": "integer", "description": "Maximum number of conversations to return", "default": 20, "minimum": 1, "maximum": self.config.get('max_results', 50) } } } ), Tool( name="read_messages", description="Read recent messages from a specific conversation", inputSchema={ "type": "object", "properties": { "conversation_id": { "type": "string", "description": "ID of the conversation to read messages from" }, "limit": { "type": "integer", "description": "Maximum number of messages to return", "default": 20, "minimum": 1, "maximum": self.config.get('max_results', 50) } }, "required": ["conversation_id"] } ), Tool( name="search_messages", description="Search for messages containing specific text across all conversations", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Search query text" }, "limit": { "type": "integer", "description": "Maximum number of results to return", "default": 20, "minimum": 1, "maximum": self.config.get('max_results', 50) } }, "required": ["query"] } ) ] @self.server.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: try: if name == "list_conversations": return await self._list_conversations(arguments) elif name == "read_messages": return await self._read_messages(arguments) elif name == "search_messages": return await self._search_messages(arguments) else: raise ValueError(f"Unknown tool: {name}") except Exception as e: logger.error(f"Error calling tool {name}: {e}") return [TextContent( type="text", text=f"Error: {str(e)}" )] async def _list_conversations(self, arguments: Dict[str, Any]) -> List[TextContent]: """Handle list_conversations tool call.""" limit = arguments.get('limit', 20) try: conversations = await asyncio.to_thread( self.reader.list_conversations, limit ) if not conversations: return [TextContent( type="text", text="No conversations found. Please ensure Beeper is installed and has been used." )] # Format output output_lines = [f"Found {len(conversations)} conversation(s):\n"] for i, conv in enumerate(conversations, 1): lines = [ f"{i}. {conv['title']}", f" ID: {conv['id']}", f" Source: {conv.get('source', 'unknown')}" ] if conv.get('last_activity'): lines.append(f" Last activity: {conv['last_activity']}") if conv.get('participant_count'): lines.append(f" Participants: {conv['participant_count']}") if conv.get('last_message_preview'): preview = conv['last_message_preview'] if len(preview) > 50: preview = preview[:50] + "..." lines.append(f" Last message: {preview}") output_lines.extend(lines) output_lines.append("") # Empty line between conversations return [TextContent( type="text", text="\n".join(output_lines) )] except Exception as e: logger.error(f"Error listing conversations: {e}") return [TextContent( type="text", text=f"Error listing conversations: {str(e)}" )] async def _read_messages(self, arguments: Dict[str, Any]) -> List[TextContent]: """Handle read_messages tool call.""" conversation_id = arguments.get('conversation_id') limit = arguments.get('limit', 20) if not conversation_id: return [TextContent( type="text", text="Error: conversation_id is required" )] try: messages = await asyncio.to_thread( self.reader.read_messages, conversation_id, limit ) if not messages: return [TextContent( type="text", text=f"No messages found in conversation '{conversation_id}'" )] # Format output output_lines = [f"Messages from conversation '{conversation_id}':\n"] for msg in messages: timestamp = msg.get('timestamp', 'Unknown time') sender = msg.get('sender', 'Unknown sender') content = msg.get('content', '') # Format message output_lines.append(f"[{timestamp}] {sender}:") output_lines.append(f" {content}") output_lines.append("") # Empty line between messages return [TextContent( type="text", text="\n".join(output_lines) )] except Exception as e: logger.error(f"Error reading messages: {e}") return [TextContent( type="text", text=f"Error reading messages: {str(e)}" )] async def _search_messages(self, arguments: Dict[str, Any]) -> List[TextContent]: """Handle search_messages tool call.""" query = arguments.get('query') limit = arguments.get('limit', 20) if not query: return [TextContent( type="text", text="Error: query is required" )] try: results = await asyncio.to_thread( self.reader.search_messages, query, limit ) if not results: return [TextContent( type="text", text=f"No messages found matching '{query}'" )] # Format output output_lines = [f"Found {len(results)} message(s) matching '{query}':\n"] for i, msg in enumerate(results, 1): conv_title = msg.get('conversation_title', 'Unknown conversation') timestamp = msg.get('timestamp', 'Unknown time') sender = msg.get('sender', 'Unknown sender') content = msg.get('content', '') # Highlight search term in content (simple approach) if query.lower() in content.lower(): start = content.lower().find(query.lower()) end = start + len(query) # Show context around match context_start = max(0, start - 30) context_end = min(len(content), end + 30) snippet = content[context_start:context_end] if context_start > 0: snippet = "..." + snippet if context_end < len(content): snippet = snippet + "..." else: snippet = content[:80] + "..." if len(content) > 80 else content output_lines.extend([ f"{i}. In '{conv_title}'", f" [{timestamp}] {sender}:", f" {snippet}", "" ]) return [TextContent( type="text", text="\n".join(output_lines) )] except Exception as e: logger.error(f"Error searching messages: {e}") return [TextContent( type="text", text=f"Error searching messages: {str(e)}" )] async def run(self): """Run the MCP server.""" try: logger.info("Starting Beeper MCP server...") async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, self.server.create_initialization_options() ) except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"Server error: {e}") raise finally: self.reader.close() async def main(): """Main entry point.""" server = BeeperMCPServer() await server.run() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mimen/beeper-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server