test_server.py•11.2 kB
#!/usr/bin/env python3
"""Testing utilities for Beeper MCP Server."""
import argparse
import asyncio
import json
import logging
import sqlite3
import sys
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Any
from beeper_reader import BeeperReader
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BeeperTester:
"""Test utilities for Beeper MCP server."""
def __init__(self):
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""Load configuration."""
config_path = Path(__file__).parent / "config.json"
if config_path.exists():
with open(config_path, 'r') as f:
return json.load(f)
return {
"database_paths": [
"~/Library/Application Support/Beeper",
"~/.config/Beeper",
"~/Library/Application Support/Element",
"~/.config/Element"
],
"max_results": 50,
"log_level": "INFO"
}
def test_discovery(self) -> bool:
"""Test database discovery."""
print("\n=== Testing Database Discovery ===")
reader = BeeperReader(self.config)
if not reader.db_paths:
print("❌ No databases found")
print("\nSearched in:")
for path in self.config['database_paths']:
print(f" - {path}")
return False
print(f"✅ Found {len(reader.db_paths)} database(s):")
for db_path, db_type in reader.db_paths:
print(f" - {db_path} (type: {db_type})")
# Test connection
if db_type == 'sqlite':
try:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
print(f" Tables: {', '.join(tables[:5])}" +
(" ..." if len(tables) > 5 else ""))
conn.close()
except Exception as e:
print(f" ⚠️ Error: {e}")
elif db_type == 'leveldb':
print(f" LevelDB database detected")
reader.close()
return True
def test_with_sample_data(self) -> bool:
"""Test with sample database."""
print("\n=== Testing with Sample Data ===")
# Create temporary database
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp:
db_path = tmp.name
try:
# Create sample Matrix-style database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create tables
cursor.execute("""
CREATE TABLE rooms (
room_id TEXT PRIMARY KEY,
name TEXT,
topic TEXT
)
""")
cursor.execute("""
CREATE TABLE events (
event_id TEXT PRIMARY KEY,
room_id TEXT,
sender TEXT,
type TEXT,
origin_server_ts INTEGER,
content TEXT
)
""")
# Insert sample data
rooms = [
('!room1:matrix.org', 'General Chat', 'General discussion'),
('!room2:matrix.org', 'Dev Team', 'Development discussions'),
('!room3:matrix.org', 'Random', None)
]
cursor.executemany("INSERT INTO rooms VALUES (?, ?, ?)", rooms)
# Insert messages
now = int(datetime.now().timestamp() * 1000)
messages = []
for i in range(30):
room_id = rooms[i % 3][0]
sender = f"@user{i % 5}:matrix.org"
content = json.dumps({
'body': f'Test message {i} in {rooms[i % 3][1]}',
'msgtype': 'm.text'
})
event_id = f"$event{i}:matrix.org"
timestamp = now - (i * 3600000) # 1 hour apart
messages.append((
event_id, room_id, sender, 'm.room.message',
timestamp, content
))
cursor.executemany(
"INSERT INTO events VALUES (?, ?, ?, ?, ?, ?)",
messages
)
conn.commit()
conn.close()
# Test with sample database
config = self.config.copy()
config['database_paths'] = [str(Path(db_path).parent)]
reader = BeeperReader(config)
# Test list_conversations
print("\nTesting list_conversations:")
conversations = reader.list_conversations(limit=5)
for conv in conversations:
print(f" - {conv['title']} (ID: {conv['id'][:20]}...)")
print(f" Last message: {conv.get('last_message_preview', 'N/A')}")
if conversations:
# Test read_messages
print("\nTesting read_messages:")
conv_id = conversations[0]['id']
messages = reader.read_messages(conv_id, limit=5)
for msg in messages[:3]:
print(f" - [{msg['timestamp']}] {msg['sender']}:")
print(f" {msg['content']}")
# Test search_messages
print("\nTesting search_messages:")
results = reader.search_messages("message", limit=5)
print(f" Found {len(results)} results")
for result in results[:3]:
print(f" - In {result['conversation_title']}: {result['content']}")
reader.close()
return True
except Exception as e:
logger.error(f"Sample data test failed: {e}")
return False
finally:
# Cleanup
Path(db_path).unlink(missing_ok=True)
def test_full_integration(self) -> bool:
"""Test full integration with actual Beeper data."""
print("\n=== Testing Full Integration ===")
reader = BeeperReader(self.config)
if not reader.db_paths:
print("⚠️ No databases found, skipping integration test")
reader.close()
return False
try:
# Test conversations
print("\n1. Listing conversations...")
conversations = reader.list_conversations(limit=3)
if not conversations:
print(" No conversations found")
reader.close()
return False
print(f" Found {len(conversations)} conversation(s)")
# Test messages
conv_id = conversations[0]['id']
print(f"\n2. Reading messages from '{conversations[0]['title']}'...")
messages = reader.read_messages(conv_id, limit=5)
if messages:
print(f" Found {len(messages)} message(s)")
else:
print(" No messages found")
# Test search
print("\n3. Searching messages...")
results = reader.search_messages("the", limit=5)
if results:
print(f" Found {len(results)} result(s)")
else:
print(" No search results")
reader.close()
return True
except Exception as e:
logger.error(f"Integration test failed: {e}")
reader.close()
return False
async def test_mcp_server(self) -> bool:
"""Test MCP server startup."""
print("\n=== Testing MCP Server ===")
try:
from main import BeeperMCPServer
server = BeeperMCPServer()
# Test tool listing
tools = await server.server.list_tools()
print(f"✅ Server initialized with {len(tools)} tools:")
for tool in tools:
print(f" - {tool.name}: {tool.description}")
# Test a tool call
print("\nTesting list_conversations tool:")
result = await server.server.call_tool(
"list_conversations",
{"limit": 2}
)
if result and result[0].text:
print("✅ Tool call successful")
print(f"Response preview: {result[0].text[:200]}...")
else:
print("⚠️ Tool returned empty result")
server.reader.close()
return True
except Exception as e:
logger.error(f"MCP server test failed: {e}")
return False
async def main():
"""Main test runner."""
parser = argparse.ArgumentParser(description='Test Beeper MCP Server')
parser.add_argument('--discover', action='store_true',
help='Test database discovery only')
parser.add_argument('--sample', action='store_true',
help='Test with sample data')
parser.add_argument('--full', action='store_true',
help='Run full integration test')
parser.add_argument('--server', action='store_true',
help='Test MCP server')
parser.add_argument('--all', action='store_true',
help='Run all tests')
args = parser.parse_args()
# Default to discovery test if no options
if not any([args.discover, args.sample, args.full, args.server, args.all]):
args.discover = True
tester = BeeperTester()
results = []
if args.discover or args.all:
results.append(("Database Discovery", tester.test_discovery()))
if args.sample or args.all:
results.append(("Sample Data", tester.test_with_sample_data()))
if args.full or args.all:
results.append(("Full Integration", tester.test_full_integration()))
if args.server or args.all:
results.append(("MCP Server", await tester.test_mcp_server()))
# Summary
print("\n=== Test Summary ===")
passed = 0
for test_name, result in results:
status = "✅ PASS" if result else "❌ FAIL"
print(f"{test_name}: {status}")
if result:
passed += 1
print(f"\nTotal: {passed}/{len(results)} passed")
return 0 if passed == len(results) else 1
if __name__ == "__main__":
sys.exit(asyncio.run(main()))