Skip to main content
Glama

Beeper MCP Server

by mimen
beeper_reader.py34.8 kB
"""Beeper database reader for MCP server.""" import sqlite3 import json import logging from pathlib import Path from typing import List, Dict, Any, Optional, Tuple, Union from datetime import datetime import os logger = logging.getLogger(__name__) try: import plyvel LEVELDB_AVAILABLE = True except ImportError: LEVELDB_AVAILABLE = False logger.warning("plyvel not available - LevelDB support disabled") class BeeperReader: """Read-only access to Beeper/Element message databases.""" def __init__(self, config: Dict[str, Any]): self.config = config self.db_paths = self._discover_databases() self.connections: Dict[str, sqlite3.Connection] = {} self.leveldb_connections: Dict[str, 'plyvel.DB'] = {} def _discover_databases(self) -> List[Tuple[Path, str]]: """Discover Beeper/Element databases in common locations. Returns list of tuples: (path, type) where type is 'sqlite' or 'leveldb' """ found_dbs = [] logger.info("Starting database discovery...") for path_str in self.config.get('database_paths', []): expanded_path = os.path.expanduser(path_str) search_path = Path(expanded_path) logger.info(f"Checking path: {expanded_path}") if not search_path.exists(): logger.info(f"Path does not exist: {expanded_path}") continue logger.info(f"Path exists: {expanded_path}") # Search for SQLite databases patterns = ['*.db', '*.sqlite', '*.sqlite3'] for pattern in patterns: logger.info(f"Searching for SQLite pattern: {pattern}") for db_file in search_path.glob(f"**/{pattern}"): logger.info(f"Found potential SQLite database: {db_file}") if self._validate_sqlite_database(db_file): found_dbs.append((db_file, 'sqlite')) logger.info(f"Validated SQLite database: {db_file}") else: logger.info(f"Invalid SQLite database: {db_file}") # Search for LevelDB databases (IndexedDB) if LEVELDB_AVAILABLE: logger.info("LevelDB support is available") # Look for IndexedDB LevelDB directories leveldb_patterns = ['*.leveldb', '*.indexeddb.leveldb'] for pattern in leveldb_patterns: logger.info(f"Searching for LevelDB pattern: {pattern}") for db_dir in search_path.glob(f"**/{pattern}"): logger.info(f"Found potential LevelDB database: {db_dir}") if db_dir.is_dir(): logger.info(f"Directory exists: {db_dir}") if self._validate_leveldb_database(db_dir): found_dbs.append((db_dir, 'leveldb')) logger.info(f"Validated LevelDB database: {db_dir}") else: logger.info(f"Invalid LevelDB database: {db_dir}") else: logger.info(f"Not a directory: {db_dir}") else: logger.info("LevelDB support is not available") return found_dbs def _validate_sqlite_database(self, db_path: Path) -> bool: """Check if SQLite database contains message-related tables.""" try: logger.info(f"Validating SQLite database: {db_path}") conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) cursor = conn.cursor() # Check for common table names cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = {row[0] for row in cursor.fetchall()} logger.info(f"Found tables in {db_path}: {', '.join(tables)}") # Matrix/Element tables if {'events', 'rooms'}.issubset(tables): logger.info(f"Found Matrix/Element tables in {db_path}") conn.close() return True # Generic messaging tables if {'messages', 'conversations'}.issubset(tables): logger.info(f"Found generic messaging tables in {db_path}") conn.close() return True # Beeper-specific tables message_tables = [table for table in tables if 'message' in table.lower() or 'room' in table.lower()] if message_tables: logger.info(f"Found Beeper-specific tables in {db_path}: {', '.join(message_tables)}") conn.close() return True logger.info(f"No relevant tables found in {db_path}") conn.close() return False except Exception as e: logger.info(f"Error validating SQLite {db_path}: {e}") return False def _validate_leveldb_database(self, db_path: Path) -> bool: """Check if LevelDB database contains message-related data.""" if not LEVELDB_AVAILABLE: logger.info("LevelDB validation skipped - plyvel not available") return False try: logger.info(f"Validating LevelDB database: {db_path}") # Try to open the database in read-only mode db = plyvel.DB(str(db_path), create_if_missing=False) logger.info("Successfully opened LevelDB database") # Check for some keys that might indicate this is a messaging database # IndexedDB typically uses keys with specific patterns found_messages = False count = 0 found_keys = [] for key, value in db: if count > 100: # Sample first 100 keys break key_str = key.decode('utf-8', errors='ignore').lower() found_keys.append(key_str) # Look for keys that might indicate messages or rooms if any(pattern in key_str for pattern in ['message', 'room', 'event', 'conversation']): logger.info(f"Found messaging-related key: {key_str}") found_messages = True break count += 1 if not found_messages and found_keys: logger.info(f"Sample of keys found: {', '.join(found_keys[:5])}") db.close() if found_messages: logger.info("LevelDB database contains message-related data") else: logger.info("No message-related data found in LevelDB database") return found_messages except Exception as e: logger.info(f"Error validating LevelDB {db_path}: {e}") return False def _get_connection(self, db_path: Path, db_type: str) -> Union[sqlite3.Connection, 'plyvel.DB']: """Get read-only connection to database.""" if db_type == 'sqlite': if str(db_path) not in self.connections: conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) conn.row_factory = sqlite3.Row self.connections[str(db_path)] = conn return self.connections[str(db_path)] elif db_type == 'leveldb' and LEVELDB_AVAILABLE: if str(db_path) not in self.leveldb_connections: db = plyvel.DB(str(db_path), create_if_missing=False) self.leveldb_connections[str(db_path)] = db return self.leveldb_connections[str(db_path)] else: raise ValueError(f"Unsupported database type: {db_type}") def _detect_schema(self, conn: Union[sqlite3.Connection, 'plyvel.DB'], db_type: str) -> str: """Detect database schema type.""" if db_type == 'sqlite': cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = {row[0] for row in cursor.fetchall()} if {'events', 'rooms'}.issubset(tables): return 'matrix' elif {'messages', 'conversations'}.issubset(tables): return 'generic' else: return 'unknown' elif db_type == 'leveldb': # For LevelDB/IndexedDB, we'll use a different detection method return 'indexeddb' else: return 'unknown' def list_conversations(self, limit: int = 20) -> List[Dict[str, Any]]: """List recent conversations with metadata.""" conversations = [] for db_path, db_type in self.db_paths: try: conn = self._get_connection(db_path, db_type) schema = self._detect_schema(conn, db_type) if schema == 'matrix': conversations.extend(self._list_matrix_rooms(conn, limit)) elif schema == 'generic': conversations.extend(self._list_generic_conversations(conn, limit)) elif schema == 'indexeddb': conversations.extend(self._list_indexeddb_conversations(conn, limit)) else: # Try fallback queries for SQLite only if db_type == 'sqlite': conversations.extend(self._list_fallback_conversations(conn, limit)) except Exception as e: logger.error(f"Error reading from {db_path}: {e}") # Sort by last activity and limit conversations.sort(key=lambda x: x.get('last_activity', ''), reverse=True) return conversations[:limit] def _list_matrix_rooms(self, conn: sqlite3.Connection, limit: int) -> List[Dict[str, Any]]: """List rooms from Matrix/Element schema.""" cursor = conn.cursor() query = """ SELECT DISTINCT r.room_id, r.name, r.topic, MAX(e.origin_server_ts) as last_activity, COUNT(DISTINCT e.sender) as participant_count FROM rooms r LEFT JOIN events e ON r.room_id = e.room_id WHERE e.type = 'm.room.message' GROUP BY r.room_id ORDER BY last_activity DESC LIMIT ? """ try: cursor.execute(query, (limit,)) rooms = [] for row in cursor: # Get last message preview preview_cursor = conn.cursor() preview_cursor.execute(""" SELECT content FROM events WHERE room_id = ? AND type = 'm.room.message' ORDER BY origin_server_ts DESC LIMIT 1 """, (row['room_id'],)) preview_row = preview_cursor.fetchone() preview = "" if preview_row and preview_row['content']: try: content = json.loads(preview_row['content']) preview = content.get('body', '')[:100] except: preview = str(preview_row['content'])[:100] rooms.append({ 'id': row['room_id'], 'title': row['name'] or row['room_id'], 'topic': row['topic'], 'last_activity': datetime.fromtimestamp(row['last_activity']/1000).isoformat() if row['last_activity'] else None, 'participant_count': row['participant_count'], 'last_message_preview': preview, 'source': 'matrix' }) return rooms except Exception as e: logger.error(f"Error in Matrix query: {e}") return [] def _list_generic_conversations(self, conn: sqlite3.Connection, limit: int) -> List[Dict[str, Any]]: """List conversations from generic schema.""" cursor = conn.cursor() # Try to determine table structure try: cursor.execute("PRAGMA table_info(conversations)") conv_columns = {row[1] for row in cursor.fetchall()} cursor.execute("PRAGMA table_info(messages)") msg_columns = {row[1] for row in cursor.fetchall()} # Build dynamic query based on available columns select_fields = ['c.id'] if 'title' in conv_columns: select_fields.append('c.title') elif 'name' in conv_columns: select_fields.append('c.name as title') else: select_fields.append("'Conversation' as title") query = f""" SELECT {', '.join(select_fields)}, MAX(m.timestamp) as last_activity, COUNT(DISTINCT m.sender_id) as participant_count, m.content as last_message FROM conversations c LEFT JOIN messages m ON c.id = m.conversation_id GROUP BY c.id ORDER BY last_activity DESC LIMIT ? """ cursor.execute(query, (limit,)) conversations = [] for row in cursor: conversations.append({ 'id': str(row['id']), 'title': row.get('title', f"Conversation {row['id']}"), 'last_activity': datetime.fromtimestamp(row['last_activity']).isoformat() if row['last_activity'] else None, 'participant_count': row['participant_count'], 'last_message_preview': str(row['last_message'])[:100] if row['last_message'] else '', 'source': 'generic' }) return conversations except Exception as e: logger.error(f"Error in generic query: {e}") return [] def _list_fallback_conversations(self, conn: sqlite3.Connection, limit: int) -> List[Dict[str, Any]]: """Fallback conversation listing for unknown schemas.""" cursor = conn.cursor() conversations = [] try: # Find any table with 'room' or 'conversation' in name cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND (name LIKE '%room%' OR name LIKE '%conversation%')") table_names = [row[0] for row in cursor.fetchall()] for table in table_names[:1]: # Try first matching table cursor.execute(f"PRAGMA table_info({table})") columns = {row[1] for row in cursor.fetchall()} if 'id' in columns: cursor.execute(f"SELECT * FROM {table} LIMIT {limit}") for row in cursor: conversations.append({ 'id': str(row['id']), 'title': f"{table} {row['id']}", 'source': 'fallback' }) except Exception as e: logger.error(f"Error in fallback query: {e}") return conversations def read_messages(self, conversation_id: str, limit: int = 20) -> List[Dict[str, Any]]: """Read recent messages from a conversation.""" messages = [] for db_path, db_type in self.db_paths: try: conn = self._get_connection(db_path, db_type) schema = self._detect_schema(conn, db_type) if schema == 'matrix': messages.extend(self._read_matrix_messages(conn, conversation_id, limit)) elif schema == 'generic': messages.extend(self._read_generic_messages(conn, conversation_id, limit)) elif schema == 'indexeddb': messages.extend(self._read_indexeddb_messages(conn, conversation_id, limit)) if messages: # Stop if we found messages break except Exception as e: logger.error(f"Error reading messages from {db_path}: {e}") return messages[:limit] def _read_matrix_messages(self, conn: sqlite3.Connection, room_id: str, limit: int) -> List[Dict[str, Any]]: """Read messages from Matrix/Element schema.""" cursor = conn.cursor() query = """ SELECT event_id, sender, origin_server_ts, content, type FROM events WHERE room_id = ? AND type = 'm.room.message' ORDER BY origin_server_ts DESC LIMIT ? """ cursor.execute(query, (room_id, limit)) messages = [] for row in cursor: content_data = {} body = "" if row['content']: try: content_data = json.loads(row['content']) body = content_data.get('body', '') except: body = str(row['content']) messages.append({ 'id': row['event_id'], 'sender': row['sender'], 'timestamp': datetime.fromtimestamp(row['origin_server_ts']/1000).isoformat(), 'content': body, 'type': content_data.get('msgtype', 'text'), 'source': 'matrix' }) messages.reverse() # Return in chronological order return messages def _read_generic_messages(self, conn: sqlite3.Connection, conversation_id: str, limit: int) -> List[Dict[str, Any]]: """Read messages from generic schema.""" cursor = conn.cursor() try: query = """ SELECT * FROM messages WHERE conversation_id = ? ORDER BY timestamp DESC LIMIT ? """ cursor.execute(query, (conversation_id, limit)) messages = [] for row in cursor: messages.append({ 'id': str(row.get('id', '')), 'sender': str(row.get('sender_id', row.get('sender', 'Unknown'))), 'timestamp': datetime.fromtimestamp(row['timestamp']).isoformat() if row.get('timestamp') else None, 'content': str(row.get('content', row.get('text', ''))), 'source': 'generic' }) messages.reverse() return messages except Exception as e: logger.error(f"Error reading generic messages: {e}") return [] def search_messages(self, query: str, limit: int = 20) -> List[Dict[str, Any]]: """Search messages across all conversations.""" results = [] search_term = f"%{query}%" for db_path, db_type in self.db_paths: try: conn = self._get_connection(db_path, db_type) schema = self._detect_schema(conn, db_type) if schema == 'matrix': results.extend(self._search_matrix_messages(conn, search_term, limit)) elif schema == 'generic': results.extend(self._search_generic_messages(conn, search_term, limit)) elif schema == 'indexeddb': results.extend(self._search_indexeddb_messages(conn, query, limit)) except Exception as e: logger.error(f"Error searching in {db_path}: {e}") # Sort by relevance (simple: exact matches first) results.sort(key=lambda x: query.lower() in x['content'].lower(), reverse=True) return results[:limit] def _search_matrix_messages(self, conn: sqlite3.Connection, search_term: str, limit: int) -> List[Dict[str, Any]]: """Search messages in Matrix schema.""" cursor = conn.cursor() query = """ SELECT e.event_id, e.sender, e.room_id, e.origin_server_ts, e.content, r.name as room_name FROM events e LEFT JOIN rooms r ON e.room_id = r.room_id WHERE e.type = 'm.room.message' AND e.content LIKE ? ORDER BY e.origin_server_ts DESC LIMIT ? """ cursor.execute(query, (search_term, limit)) results = [] for row in cursor: body = "" if row['content']: try: content_data = json.loads(row['content']) body = content_data.get('body', '') except: body = str(row['content']) results.append({ 'id': row['event_id'], 'conversation_id': row['room_id'], 'conversation_title': row['room_name'] or row['room_id'], 'sender': row['sender'], 'timestamp': datetime.fromtimestamp(row['origin_server_ts']/1000).isoformat(), 'content': body, 'source': 'matrix' }) return results def _search_generic_messages(self, conn: sqlite3.Connection, search_term: str, limit: int) -> List[Dict[str, Any]]: """Search messages in generic schema.""" cursor = conn.cursor() try: query = """ SELECT m.*, c.title as conversation_title FROM messages m LEFT JOIN conversations c ON m.conversation_id = c.id WHERE m.content LIKE ? ORDER BY m.timestamp DESC LIMIT ? """ cursor.execute(query, (search_term, limit)) results = [] for row in cursor: results.append({ 'id': str(row.get('id', '')), 'conversation_id': str(row.get('conversation_id', '')), 'conversation_title': row.get('conversation_title', f"Conversation {row.get('conversation_id', '')}"), 'sender': str(row.get('sender_id', row.get('sender', 'Unknown'))), 'timestamp': datetime.fromtimestamp(row['timestamp']).isoformat() if row.get('timestamp') else None, 'content': str(row.get('content', row.get('text', ''))), 'source': 'generic' }) return results except Exception as e: logger.error(f"Error searching generic messages: {e}") return [] def _list_indexeddb_conversations(self, db: 'plyvel.DB', limit: int) -> List[Dict[str, Any]]: """List conversations from IndexedDB/LevelDB.""" conversations = [] room_data = {} try: # IndexedDB stores data with specific key patterns # We need to identify room/conversation keys and message keys for key, value in db: try: key_str = key.decode('utf-8', errors='ignore') # Look for room or conversation data # IndexedDB keys often have patterns like "room:roomId" or similar if 'room' in key_str.lower() or 'conversation' in key_str.lower(): try: data = json.loads(value.decode('utf-8', errors='ignore')) if isinstance(data, dict) and any(k in data for k in ['id', 'roomId', 'room_id', 'name', 'title']): room_id = data.get('id') or data.get('roomId') or data.get('room_id', key_str) room_data[room_id] = { 'id': room_id, 'title': data.get('name') or data.get('title', room_id), 'topic': data.get('topic', ''), 'last_activity': data.get('lastActivity') or data.get('last_activity'), 'participant_count': len(data.get('members', [])) if 'members' in data else 0, 'source': 'indexeddb' } except json.JSONDecodeError: pass except Exception as e: logger.debug(f"Error processing key {key}: {e}") # Convert to list and sort by last activity conversations = list(room_data.values()) conversations.sort(key=lambda x: x.get('last_activity', ''), reverse=True) # Get last message preview for top conversations for conv in conversations[:limit]: conv['last_message_preview'] = self._get_indexeddb_last_message(db, conv['id']) except Exception as e: logger.error(f"Error listing IndexedDB conversations: {e}") return conversations[:limit] def _get_indexeddb_last_message(self, db: 'plyvel.DB', room_id: str) -> str: """Get last message preview for a room from IndexedDB.""" try: messages = [] # Look for messages related to this room for key, value in db: try: key_str = key.decode('utf-8', errors='ignore') if room_id in key_str and ('message' in key_str.lower() or 'event' in key_str.lower()): try: data = json.loads(value.decode('utf-8', errors='ignore')) if isinstance(data, dict): # Extract message content content = data.get('content') or data.get('body') or data.get('text', '') if isinstance(content, dict): content = content.get('body', '') timestamp = data.get('timestamp') or data.get('origin_server_ts', 0) if content and timestamp: messages.append((timestamp, str(content)[:100])) except json.JSONDecodeError: pass except Exception: pass # Return the most recent message if messages: messages.sort(key=lambda x: x[0], reverse=True) return messages[0][1] except Exception as e: logger.debug(f"Error getting last message for room {room_id}: {e}") return "" def _read_indexeddb_messages(self, db: 'plyvel.DB', conversation_id: str, limit: int) -> List[Dict[str, Any]]: """Read messages from IndexedDB/LevelDB.""" messages = [] try: # Collect all messages for this conversation for key, value in db: try: key_str = key.decode('utf-8', errors='ignore') if conversation_id in key_str and ('message' in key_str.lower() or 'event' in key_str.lower()): try: data = json.loads(value.decode('utf-8', errors='ignore')) if isinstance(data, dict): # Extract message data content = data.get('content') or data.get('body') or data.get('text', '') if isinstance(content, dict): content = content.get('body', '') message = { 'id': data.get('id') or data.get('event_id', key_str), 'sender': data.get('sender') or data.get('user_id') or data.get('from', 'Unknown'), 'timestamp': None, 'content': str(content), 'type': data.get('type') or data.get('msgtype', 'text'), 'source': 'indexeddb' } # Handle timestamp ts = data.get('timestamp') or data.get('origin_server_ts') if ts: try: # Handle millisecond timestamps if ts > 10000000000: ts = ts / 1000 message['timestamp'] = datetime.fromtimestamp(ts).isoformat() except: pass if message['content']: messages.append(message) except json.JSONDecodeError: pass except Exception as e: logger.debug(f"Error processing message key {key}: {e}") # Sort by timestamp and limit messages.sort(key=lambda x: x.get('timestamp', ''), reverse=True) messages = messages[:limit] messages.reverse() # Return in chronological order except Exception as e: logger.error(f"Error reading IndexedDB messages: {e}") return messages def _search_indexeddb_messages(self, db: 'plyvel.DB', query: str, limit: int) -> List[Dict[str, Any]]: """Search messages in IndexedDB/LevelDB.""" results = [] query_lower = query.lower() try: # Search through all messages for key, value in db: try: key_str = key.decode('utf-8', errors='ignore') if 'message' in key_str.lower() or 'event' in key_str.lower(): try: data = json.loads(value.decode('utf-8', errors='ignore')) if isinstance(data, dict): # Extract content content = data.get('content') or data.get('body') or data.get('text', '') if isinstance(content, dict): content = content.get('body', '') content_str = str(content) # Check if query matches if query_lower in content_str.lower(): # Extract room/conversation ID from key room_id = '' if 'room' in key_str: parts = key_str.split('room') if len(parts) > 1: room_id = parts[1].split(':')[0].split('/')[0] result = { 'id': data.get('id') or data.get('event_id', key_str), 'conversation_id': room_id or data.get('room_id', ''), 'conversation_title': f"Room {room_id}" if room_id else "Unknown", 'sender': data.get('sender') or data.get('user_id', 'Unknown'), 'timestamp': None, 'content': content_str, 'source': 'indexeddb' } # Handle timestamp ts = data.get('timestamp') or data.get('origin_server_ts') if ts: try: if ts > 10000000000: ts = ts / 1000 result['timestamp'] = datetime.fromtimestamp(ts).isoformat() except: pass results.append(result) if len(results) >= limit * 2: # Get extra to allow for sorting break except json.JSONDecodeError: pass except Exception: pass except Exception as e: logger.error(f"Error searching IndexedDB: {e}") return results[:limit] def close(self): """Close all database connections.""" for conn in self.connections.values(): conn.close() self.connections.clear() for db in self.leveldb_connections.values(): db.close() self.leveldb_connections.clear()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mimen/beeper-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server