beeper_reader.py•34.8 kB
"""Beeper database reader for MCP server."""
import sqlite3
import json
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple, Union
from datetime import datetime
import os
logger = logging.getLogger(__name__)
try:
import plyvel
LEVELDB_AVAILABLE = True
except ImportError:
LEVELDB_AVAILABLE = False
logger.warning("plyvel not available - LevelDB support disabled")
class BeeperReader:
"""Read-only access to Beeper/Element message databases."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.db_paths = self._discover_databases()
self.connections: Dict[str, sqlite3.Connection] = {}
self.leveldb_connections: Dict[str, 'plyvel.DB'] = {}
def _discover_databases(self) -> List[Tuple[Path, str]]:
"""Discover Beeper/Element databases in common locations.
Returns list of tuples: (path, type) where type is 'sqlite' or 'leveldb'
"""
found_dbs = []
logger.info("Starting database discovery...")
for path_str in self.config.get('database_paths', []):
expanded_path = os.path.expanduser(path_str)
search_path = Path(expanded_path)
logger.info(f"Checking path: {expanded_path}")
if not search_path.exists():
logger.info(f"Path does not exist: {expanded_path}")
continue
logger.info(f"Path exists: {expanded_path}")
# Search for SQLite databases
patterns = ['*.db', '*.sqlite', '*.sqlite3']
for pattern in patterns:
logger.info(f"Searching for SQLite pattern: {pattern}")
for db_file in search_path.glob(f"**/{pattern}"):
logger.info(f"Found potential SQLite database: {db_file}")
if self._validate_sqlite_database(db_file):
found_dbs.append((db_file, 'sqlite'))
logger.info(f"Validated SQLite database: {db_file}")
else:
logger.info(f"Invalid SQLite database: {db_file}")
# Search for LevelDB databases (IndexedDB)
if LEVELDB_AVAILABLE:
logger.info("LevelDB support is available")
# Look for IndexedDB LevelDB directories
leveldb_patterns = ['*.leveldb', '*.indexeddb.leveldb']
for pattern in leveldb_patterns:
logger.info(f"Searching for LevelDB pattern: {pattern}")
for db_dir in search_path.glob(f"**/{pattern}"):
logger.info(f"Found potential LevelDB database: {db_dir}")
if db_dir.is_dir():
logger.info(f"Directory exists: {db_dir}")
if self._validate_leveldb_database(db_dir):
found_dbs.append((db_dir, 'leveldb'))
logger.info(f"Validated LevelDB database: {db_dir}")
else:
logger.info(f"Invalid LevelDB database: {db_dir}")
else:
logger.info(f"Not a directory: {db_dir}")
else:
logger.info("LevelDB support is not available")
return found_dbs
def _validate_sqlite_database(self, db_path: Path) -> bool:
"""Check if SQLite database contains message-related tables."""
try:
logger.info(f"Validating SQLite database: {db_path}")
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
cursor = conn.cursor()
# Check for common table names
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = {row[0] for row in cursor.fetchall()}
logger.info(f"Found tables in {db_path}: {', '.join(tables)}")
# Matrix/Element tables
if {'events', 'rooms'}.issubset(tables):
logger.info(f"Found Matrix/Element tables in {db_path}")
conn.close()
return True
# Generic messaging tables
if {'messages', 'conversations'}.issubset(tables):
logger.info(f"Found generic messaging tables in {db_path}")
conn.close()
return True
# Beeper-specific tables
message_tables = [table for table in tables if 'message' in table.lower() or 'room' in table.lower()]
if message_tables:
logger.info(f"Found Beeper-specific tables in {db_path}: {', '.join(message_tables)}")
conn.close()
return True
logger.info(f"No relevant tables found in {db_path}")
conn.close()
return False
except Exception as e:
logger.info(f"Error validating SQLite {db_path}: {e}")
return False
def _validate_leveldb_database(self, db_path: Path) -> bool:
"""Check if LevelDB database contains message-related data."""
if not LEVELDB_AVAILABLE:
logger.info("LevelDB validation skipped - plyvel not available")
return False
try:
logger.info(f"Validating LevelDB database: {db_path}")
# Try to open the database in read-only mode
db = plyvel.DB(str(db_path), create_if_missing=False)
logger.info("Successfully opened LevelDB database")
# Check for some keys that might indicate this is a messaging database
# IndexedDB typically uses keys with specific patterns
found_messages = False
count = 0
found_keys = []
for key, value in db:
if count > 100: # Sample first 100 keys
break
key_str = key.decode('utf-8', errors='ignore').lower()
found_keys.append(key_str)
# Look for keys that might indicate messages or rooms
if any(pattern in key_str for pattern in ['message', 'room', 'event', 'conversation']):
logger.info(f"Found messaging-related key: {key_str}")
found_messages = True
break
count += 1
if not found_messages and found_keys:
logger.info(f"Sample of keys found: {', '.join(found_keys[:5])}")
db.close()
if found_messages:
logger.info("LevelDB database contains message-related data")
else:
logger.info("No message-related data found in LevelDB database")
return found_messages
except Exception as e:
logger.info(f"Error validating LevelDB {db_path}: {e}")
return False
def _get_connection(self, db_path: Path, db_type: str) -> Union[sqlite3.Connection, 'plyvel.DB']:
"""Get read-only connection to database."""
if db_type == 'sqlite':
if str(db_path) not in self.connections:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
self.connections[str(db_path)] = conn
return self.connections[str(db_path)]
elif db_type == 'leveldb' and LEVELDB_AVAILABLE:
if str(db_path) not in self.leveldb_connections:
db = plyvel.DB(str(db_path), create_if_missing=False)
self.leveldb_connections[str(db_path)] = db
return self.leveldb_connections[str(db_path)]
else:
raise ValueError(f"Unsupported database type: {db_type}")
def _detect_schema(self, conn: Union[sqlite3.Connection, 'plyvel.DB'], db_type: str) -> str:
"""Detect database schema type."""
if db_type == 'sqlite':
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = {row[0] for row in cursor.fetchall()}
if {'events', 'rooms'}.issubset(tables):
return 'matrix'
elif {'messages', 'conversations'}.issubset(tables):
return 'generic'
else:
return 'unknown'
elif db_type == 'leveldb':
# For LevelDB/IndexedDB, we'll use a different detection method
return 'indexeddb'
else:
return 'unknown'
def list_conversations(self, limit: int = 20) -> List[Dict[str, Any]]:
"""List recent conversations with metadata."""
conversations = []
for db_path, db_type in self.db_paths:
try:
conn = self._get_connection(db_path, db_type)
schema = self._detect_schema(conn, db_type)
if schema == 'matrix':
conversations.extend(self._list_matrix_rooms(conn, limit))
elif schema == 'generic':
conversations.extend(self._list_generic_conversations(conn, limit))
elif schema == 'indexeddb':
conversations.extend(self._list_indexeddb_conversations(conn, limit))
else:
# Try fallback queries for SQLite only
if db_type == 'sqlite':
conversations.extend(self._list_fallback_conversations(conn, limit))
except Exception as e:
logger.error(f"Error reading from {db_path}: {e}")
# Sort by last activity and limit
conversations.sort(key=lambda x: x.get('last_activity', ''), reverse=True)
return conversations[:limit]
def _list_matrix_rooms(self, conn: sqlite3.Connection, limit: int) -> List[Dict[str, Any]]:
"""List rooms from Matrix/Element schema."""
cursor = conn.cursor()
query = """
SELECT DISTINCT
r.room_id,
r.name,
r.topic,
MAX(e.origin_server_ts) as last_activity,
COUNT(DISTINCT e.sender) as participant_count
FROM rooms r
LEFT JOIN events e ON r.room_id = e.room_id
WHERE e.type = 'm.room.message'
GROUP BY r.room_id
ORDER BY last_activity DESC
LIMIT ?
"""
try:
cursor.execute(query, (limit,))
rooms = []
for row in cursor:
# Get last message preview
preview_cursor = conn.cursor()
preview_cursor.execute("""
SELECT content FROM events
WHERE room_id = ? AND type = 'm.room.message'
ORDER BY origin_server_ts DESC LIMIT 1
""", (row['room_id'],))
preview_row = preview_cursor.fetchone()
preview = ""
if preview_row and preview_row['content']:
try:
content = json.loads(preview_row['content'])
preview = content.get('body', '')[:100]
except:
preview = str(preview_row['content'])[:100]
rooms.append({
'id': row['room_id'],
'title': row['name'] or row['room_id'],
'topic': row['topic'],
'last_activity': datetime.fromtimestamp(row['last_activity']/1000).isoformat() if row['last_activity'] else None,
'participant_count': row['participant_count'],
'last_message_preview': preview,
'source': 'matrix'
})
return rooms
except Exception as e:
logger.error(f"Error in Matrix query: {e}")
return []
def _list_generic_conversations(self, conn: sqlite3.Connection, limit: int) -> List[Dict[str, Any]]:
"""List conversations from generic schema."""
cursor = conn.cursor()
# Try to determine table structure
try:
cursor.execute("PRAGMA table_info(conversations)")
conv_columns = {row[1] for row in cursor.fetchall()}
cursor.execute("PRAGMA table_info(messages)")
msg_columns = {row[1] for row in cursor.fetchall()}
# Build dynamic query based on available columns
select_fields = ['c.id']
if 'title' in conv_columns:
select_fields.append('c.title')
elif 'name' in conv_columns:
select_fields.append('c.name as title')
else:
select_fields.append("'Conversation' as title")
query = f"""
SELECT {', '.join(select_fields)},
MAX(m.timestamp) as last_activity,
COUNT(DISTINCT m.sender_id) as participant_count,
m.content as last_message
FROM conversations c
LEFT JOIN messages m ON c.id = m.conversation_id
GROUP BY c.id
ORDER BY last_activity DESC
LIMIT ?
"""
cursor.execute(query, (limit,))
conversations = []
for row in cursor:
conversations.append({
'id': str(row['id']),
'title': row.get('title', f"Conversation {row['id']}"),
'last_activity': datetime.fromtimestamp(row['last_activity']).isoformat() if row['last_activity'] else None,
'participant_count': row['participant_count'],
'last_message_preview': str(row['last_message'])[:100] if row['last_message'] else '',
'source': 'generic'
})
return conversations
except Exception as e:
logger.error(f"Error in generic query: {e}")
return []
def _list_fallback_conversations(self, conn: sqlite3.Connection, limit: int) -> List[Dict[str, Any]]:
"""Fallback conversation listing for unknown schemas."""
cursor = conn.cursor()
conversations = []
try:
# Find any table with 'room' or 'conversation' in name
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND (name LIKE '%room%' OR name LIKE '%conversation%')")
table_names = [row[0] for row in cursor.fetchall()]
for table in table_names[:1]: # Try first matching table
cursor.execute(f"PRAGMA table_info({table})")
columns = {row[1] for row in cursor.fetchall()}
if 'id' in columns:
cursor.execute(f"SELECT * FROM {table} LIMIT {limit}")
for row in cursor:
conversations.append({
'id': str(row['id']),
'title': f"{table} {row['id']}",
'source': 'fallback'
})
except Exception as e:
logger.error(f"Error in fallback query: {e}")
return conversations
def read_messages(self, conversation_id: str, limit: int = 20) -> List[Dict[str, Any]]:
"""Read recent messages from a conversation."""
messages = []
for db_path, db_type in self.db_paths:
try:
conn = self._get_connection(db_path, db_type)
schema = self._detect_schema(conn, db_type)
if schema == 'matrix':
messages.extend(self._read_matrix_messages(conn, conversation_id, limit))
elif schema == 'generic':
messages.extend(self._read_generic_messages(conn, conversation_id, limit))
elif schema == 'indexeddb':
messages.extend(self._read_indexeddb_messages(conn, conversation_id, limit))
if messages: # Stop if we found messages
break
except Exception as e:
logger.error(f"Error reading messages from {db_path}: {e}")
return messages[:limit]
def _read_matrix_messages(self, conn: sqlite3.Connection, room_id: str, limit: int) -> List[Dict[str, Any]]:
"""Read messages from Matrix/Element schema."""
cursor = conn.cursor()
query = """
SELECT
event_id,
sender,
origin_server_ts,
content,
type
FROM events
WHERE room_id = ? AND type = 'm.room.message'
ORDER BY origin_server_ts DESC
LIMIT ?
"""
cursor.execute(query, (room_id, limit))
messages = []
for row in cursor:
content_data = {}
body = ""
if row['content']:
try:
content_data = json.loads(row['content'])
body = content_data.get('body', '')
except:
body = str(row['content'])
messages.append({
'id': row['event_id'],
'sender': row['sender'],
'timestamp': datetime.fromtimestamp(row['origin_server_ts']/1000).isoformat(),
'content': body,
'type': content_data.get('msgtype', 'text'),
'source': 'matrix'
})
messages.reverse() # Return in chronological order
return messages
def _read_generic_messages(self, conn: sqlite3.Connection, conversation_id: str, limit: int) -> List[Dict[str, Any]]:
"""Read messages from generic schema."""
cursor = conn.cursor()
try:
query = """
SELECT * FROM messages
WHERE conversation_id = ?
ORDER BY timestamp DESC
LIMIT ?
"""
cursor.execute(query, (conversation_id, limit))
messages = []
for row in cursor:
messages.append({
'id': str(row.get('id', '')),
'sender': str(row.get('sender_id', row.get('sender', 'Unknown'))),
'timestamp': datetime.fromtimestamp(row['timestamp']).isoformat() if row.get('timestamp') else None,
'content': str(row.get('content', row.get('text', ''))),
'source': 'generic'
})
messages.reverse()
return messages
except Exception as e:
logger.error(f"Error reading generic messages: {e}")
return []
def search_messages(self, query: str, limit: int = 20) -> List[Dict[str, Any]]:
"""Search messages across all conversations."""
results = []
search_term = f"%{query}%"
for db_path, db_type in self.db_paths:
try:
conn = self._get_connection(db_path, db_type)
schema = self._detect_schema(conn, db_type)
if schema == 'matrix':
results.extend(self._search_matrix_messages(conn, search_term, limit))
elif schema == 'generic':
results.extend(self._search_generic_messages(conn, search_term, limit))
elif schema == 'indexeddb':
results.extend(self._search_indexeddb_messages(conn, query, limit))
except Exception as e:
logger.error(f"Error searching in {db_path}: {e}")
# Sort by relevance (simple: exact matches first)
results.sort(key=lambda x: query.lower() in x['content'].lower(), reverse=True)
return results[:limit]
def _search_matrix_messages(self, conn: sqlite3.Connection, search_term: str, limit: int) -> List[Dict[str, Any]]:
"""Search messages in Matrix schema."""
cursor = conn.cursor()
query = """
SELECT
e.event_id,
e.sender,
e.room_id,
e.origin_server_ts,
e.content,
r.name as room_name
FROM events e
LEFT JOIN rooms r ON e.room_id = r.room_id
WHERE e.type = 'm.room.message' AND e.content LIKE ?
ORDER BY e.origin_server_ts DESC
LIMIT ?
"""
cursor.execute(query, (search_term, limit))
results = []
for row in cursor:
body = ""
if row['content']:
try:
content_data = json.loads(row['content'])
body = content_data.get('body', '')
except:
body = str(row['content'])
results.append({
'id': row['event_id'],
'conversation_id': row['room_id'],
'conversation_title': row['room_name'] or row['room_id'],
'sender': row['sender'],
'timestamp': datetime.fromtimestamp(row['origin_server_ts']/1000).isoformat(),
'content': body,
'source': 'matrix'
})
return results
def _search_generic_messages(self, conn: sqlite3.Connection, search_term: str, limit: int) -> List[Dict[str, Any]]:
"""Search messages in generic schema."""
cursor = conn.cursor()
try:
query = """
SELECT m.*, c.title as conversation_title
FROM messages m
LEFT JOIN conversations c ON m.conversation_id = c.id
WHERE m.content LIKE ?
ORDER BY m.timestamp DESC
LIMIT ?
"""
cursor.execute(query, (search_term, limit))
results = []
for row in cursor:
results.append({
'id': str(row.get('id', '')),
'conversation_id': str(row.get('conversation_id', '')),
'conversation_title': row.get('conversation_title', f"Conversation {row.get('conversation_id', '')}"),
'sender': str(row.get('sender_id', row.get('sender', 'Unknown'))),
'timestamp': datetime.fromtimestamp(row['timestamp']).isoformat() if row.get('timestamp') else None,
'content': str(row.get('content', row.get('text', ''))),
'source': 'generic'
})
return results
except Exception as e:
logger.error(f"Error searching generic messages: {e}")
return []
def _list_indexeddb_conversations(self, db: 'plyvel.DB', limit: int) -> List[Dict[str, Any]]:
"""List conversations from IndexedDB/LevelDB."""
conversations = []
room_data = {}
try:
# IndexedDB stores data with specific key patterns
# We need to identify room/conversation keys and message keys
for key, value in db:
try:
key_str = key.decode('utf-8', errors='ignore')
# Look for room or conversation data
# IndexedDB keys often have patterns like "room:roomId" or similar
if 'room' in key_str.lower() or 'conversation' in key_str.lower():
try:
data = json.loads(value.decode('utf-8', errors='ignore'))
if isinstance(data, dict) and any(k in data for k in ['id', 'roomId', 'room_id', 'name', 'title']):
room_id = data.get('id') or data.get('roomId') or data.get('room_id', key_str)
room_data[room_id] = {
'id': room_id,
'title': data.get('name') or data.get('title', room_id),
'topic': data.get('topic', ''),
'last_activity': data.get('lastActivity') or data.get('last_activity'),
'participant_count': len(data.get('members', [])) if 'members' in data else 0,
'source': 'indexeddb'
}
except json.JSONDecodeError:
pass
except Exception as e:
logger.debug(f"Error processing key {key}: {e}")
# Convert to list and sort by last activity
conversations = list(room_data.values())
conversations.sort(key=lambda x: x.get('last_activity', ''), reverse=True)
# Get last message preview for top conversations
for conv in conversations[:limit]:
conv['last_message_preview'] = self._get_indexeddb_last_message(db, conv['id'])
except Exception as e:
logger.error(f"Error listing IndexedDB conversations: {e}")
return conversations[:limit]
def _get_indexeddb_last_message(self, db: 'plyvel.DB', room_id: str) -> str:
"""Get last message preview for a room from IndexedDB."""
try:
messages = []
# Look for messages related to this room
for key, value in db:
try:
key_str = key.decode('utf-8', errors='ignore')
if room_id in key_str and ('message' in key_str.lower() or 'event' in key_str.lower()):
try:
data = json.loads(value.decode('utf-8', errors='ignore'))
if isinstance(data, dict):
# Extract message content
content = data.get('content') or data.get('body') or data.get('text', '')
if isinstance(content, dict):
content = content.get('body', '')
timestamp = data.get('timestamp') or data.get('origin_server_ts', 0)
if content and timestamp:
messages.append((timestamp, str(content)[:100]))
except json.JSONDecodeError:
pass
except Exception:
pass
# Return the most recent message
if messages:
messages.sort(key=lambda x: x[0], reverse=True)
return messages[0][1]
except Exception as e:
logger.debug(f"Error getting last message for room {room_id}: {e}")
return ""
def _read_indexeddb_messages(self, db: 'plyvel.DB', conversation_id: str, limit: int) -> List[Dict[str, Any]]:
"""Read messages from IndexedDB/LevelDB."""
messages = []
try:
# Collect all messages for this conversation
for key, value in db:
try:
key_str = key.decode('utf-8', errors='ignore')
if conversation_id in key_str and ('message' in key_str.lower() or 'event' in key_str.lower()):
try:
data = json.loads(value.decode('utf-8', errors='ignore'))
if isinstance(data, dict):
# Extract message data
content = data.get('content') or data.get('body') or data.get('text', '')
if isinstance(content, dict):
content = content.get('body', '')
message = {
'id': data.get('id') or data.get('event_id', key_str),
'sender': data.get('sender') or data.get('user_id') or data.get('from', 'Unknown'),
'timestamp': None,
'content': str(content),
'type': data.get('type') or data.get('msgtype', 'text'),
'source': 'indexeddb'
}
# Handle timestamp
ts = data.get('timestamp') or data.get('origin_server_ts')
if ts:
try:
# Handle millisecond timestamps
if ts > 10000000000:
ts = ts / 1000
message['timestamp'] = datetime.fromtimestamp(ts).isoformat()
except:
pass
if message['content']:
messages.append(message)
except json.JSONDecodeError:
pass
except Exception as e:
logger.debug(f"Error processing message key {key}: {e}")
# Sort by timestamp and limit
messages.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
messages = messages[:limit]
messages.reverse() # Return in chronological order
except Exception as e:
logger.error(f"Error reading IndexedDB messages: {e}")
return messages
def _search_indexeddb_messages(self, db: 'plyvel.DB', query: str, limit: int) -> List[Dict[str, Any]]:
"""Search messages in IndexedDB/LevelDB."""
results = []
query_lower = query.lower()
try:
# Search through all messages
for key, value in db:
try:
key_str = key.decode('utf-8', errors='ignore')
if 'message' in key_str.lower() or 'event' in key_str.lower():
try:
data = json.loads(value.decode('utf-8', errors='ignore'))
if isinstance(data, dict):
# Extract content
content = data.get('content') or data.get('body') or data.get('text', '')
if isinstance(content, dict):
content = content.get('body', '')
content_str = str(content)
# Check if query matches
if query_lower in content_str.lower():
# Extract room/conversation ID from key
room_id = ''
if 'room' in key_str:
parts = key_str.split('room')
if len(parts) > 1:
room_id = parts[1].split(':')[0].split('/')[0]
result = {
'id': data.get('id') or data.get('event_id', key_str),
'conversation_id': room_id or data.get('room_id', ''),
'conversation_title': f"Room {room_id}" if room_id else "Unknown",
'sender': data.get('sender') or data.get('user_id', 'Unknown'),
'timestamp': None,
'content': content_str,
'source': 'indexeddb'
}
# Handle timestamp
ts = data.get('timestamp') or data.get('origin_server_ts')
if ts:
try:
if ts > 10000000000:
ts = ts / 1000
result['timestamp'] = datetime.fromtimestamp(ts).isoformat()
except:
pass
results.append(result)
if len(results) >= limit * 2: # Get extra to allow for sorting
break
except json.JSONDecodeError:
pass
except Exception:
pass
except Exception as e:
logger.error(f"Error searching IndexedDB: {e}")
return results[:limit]
def close(self):
"""Close all database connections."""
for conn in self.connections.values():
conn.close()
self.connections.clear()
for db in self.leveldb_connections.values():
db.close()
self.leveldb_connections.clear()