Skip to main content
Glama
test_performance_benchmarks.py18.1 kB
"""Performance benchmarks for large bundle exports and viewer loading. This test suite validates performance characteristics of the share export pipeline: 1. Export time for different database sizes 2. Chunk configuration validation 3. Bundle size measurements 4. Database compressibility Reference: PLAN_TO_ENABLE_EASY_AND_SECURE_SHARING_OF_AGENT_MAILBOX.md line 261 """ from __future__ import annotations import json import sqlite3 import time from pathlib import Path import pytest from mcp_agent_mail import share @pytest.fixture def small_db(tmp_path: Path) -> Path: """Create a small database (~200KB-1MB with overhead) with 100 messages.""" return _create_test_database(tmp_path, "small.sqlite3", num_messages=100, body_size=1000) @pytest.fixture def medium_db(tmp_path: Path) -> Path: """Create a medium database (~10MB) with 1000 messages.""" return _create_test_database(tmp_path, "medium.sqlite3", num_messages=1000, body_size=10000) @pytest.fixture def large_db(tmp_path: Path) -> Path: """Create a large database (~100MB) with 5000 messages.""" return _create_test_database(tmp_path, "large.sqlite3", num_messages=5000, body_size=20000) def _create_test_database(tmp_path: Path, name: str, num_messages: int, body_size: int) -> Path: """Create a test database with specified number of messages and body size.""" db_path = tmp_path / name conn = sqlite3.connect(db_path) try: # Create schema matching production conn.executescript(""" CREATE TABLE projects ( id INTEGER PRIMARY KEY, slug TEXT, human_key TEXT ); CREATE TABLE agents ( id INTEGER PRIMARY KEY, project_id INTEGER, name TEXT, program TEXT, model TEXT ); CREATE TABLE messages ( id INTEGER PRIMARY KEY, project_id INTEGER, subject TEXT, body_md TEXT, importance TEXT, ack_required INTEGER, created_ts TEXT, attachments TEXT, thread_id TEXT, reply_to INTEGER ); CREATE TABLE message_recipients ( id INTEGER PRIMARY KEY, message_id INTEGER, agent_id INTEGER, kind TEXT ); -- Indexes for performance CREATE INDEX idx_messages_created_ts ON messages(created_ts); CREATE INDEX idx_messages_thread_id ON messages(thread_id); CREATE INDEX idx_messages_project_id ON messages(project_id); CREATE INDEX idx_message_recipients_message_id ON message_recipients(message_id); -- FTS5 for search CREATE VIRTUAL TABLE fts_messages USING fts5( subject, body_md, content=messages, content_rowid=id ); """) # Insert test project conn.execute("INSERT INTO projects (id, slug, human_key) VALUES (1, 'perf-test', 'Performance Test')") # Insert test agents conn.execute("INSERT INTO agents (id, project_id, name, program, model) VALUES (1, 1, 'TestAgent', 'test', 'test-model')") # Insert messages with realistic size # Use a repeating pattern to ensure compressibility body_template = "This is test message content. " * (body_size // 30) for i in range(1, num_messages + 1): thread_id = f"thread-{(i - 1) // 10 + 1}" if i % 3 != 0 else None conn.execute( """INSERT INTO messages (id, project_id, subject, body_md, importance, ack_required, created_ts, attachments, thread_id) VALUES (?, 1, ?, ?, 'normal', 0, ?, '[]', ?)""", ( i, f"Test Message {i}", f"{body_template} Message number {i}.", f"2025-11-{5 - i // (num_messages // 3 + 1):02d}T{i % 24:02d}:{i % 60:02d}:00Z", thread_id, ), ) # Add FTS entry conn.execute( "INSERT INTO fts_messages(rowid, subject, body_md) VALUES (?, ?, ?)", (i, f"Test Message {i}", f"{body_template} Message number {i}."), ) # Add recipient conn.execute( "INSERT INTO message_recipients (message_id, agent_id, kind) VALUES (?, 1, 'to')", (i,), ) conn.commit() finally: conn.close() return db_path def _get_file_size_mb(path: Path) -> float: """Get file size in MB.""" return path.stat().st_size / (1024 * 1024) @pytest.mark.benchmark def test_small_bundle_export_performance(small_db: Path, tmp_path: Path) -> None: """Benchmark snapshot creation performance for ~1MB database. Small databases should snapshot very quickly as they use SQLite Online Backup API. """ snapshot_path = tmp_path / "snapshot.sqlite3" # Measure snapshot time start_time = time.time() share.create_sqlite_snapshot(small_db, snapshot_path, checkpoint=True) export_time = time.time() - start_time # Validate snapshot was created assert snapshot_path.exists() db_size_mb = _get_file_size_mb(snapshot_path) print("\nSmall snapshot performance:") print(f" Database size: {db_size_mb:.2f} MB") print(f" Snapshot time: {export_time:.3f} seconds") if export_time > 0: print(f" Throughput: {db_size_mb / export_time:.2f} MB/s") # Small snapshots should be fast assert export_time < 5.0, "Small snapshot should complete in < 5 seconds" @pytest.mark.benchmark @pytest.mark.slow def test_medium_bundle_export_performance(medium_db: Path, tmp_path: Path) -> None: """Benchmark snapshot creation performance for ~10MB database. Medium databases should snapshot efficiently without chunking. """ snapshot_path = tmp_path / "snapshot.sqlite3" # Measure snapshot time start_time = time.time() share.create_sqlite_snapshot(medium_db, snapshot_path, checkpoint=True) export_time = time.time() - start_time # Validate snapshot was created assert snapshot_path.exists() db_size_mb = _get_file_size_mb(snapshot_path) print("\nMedium snapshot performance:") print(f" Database size: {db_size_mb:.2f} MB") print(f" Snapshot time: {export_time:.3f} seconds") if export_time > 0: print(f" Throughput: {db_size_mb / export_time:.2f} MB/s") # Medium snapshots should complete in reasonable time assert export_time < 30.0, "Medium snapshot should complete in < 30 seconds" @pytest.mark.benchmark @pytest.mark.slow def test_large_bundle_export_performance(large_db: Path, tmp_path: Path) -> None: """Benchmark snapshot + chunking performance for ~100MB database. Large databases should snapshot efficiently and optionally be chunked for httpvfs. """ snapshot_path = tmp_path / "snapshot.sqlite3" # Measure snapshot time start_time = time.time() share.create_sqlite_snapshot(large_db, snapshot_path, checkpoint=True) snapshot_time = time.time() - start_time # Validate snapshot was created assert snapshot_path.exists() db_size_mb = _get_file_size_mb(snapshot_path) print("\nLarge snapshot performance:") print(f" Database size: {db_size_mb:.2f} MB") print(f" Snapshot time: {snapshot_time:.3f} seconds") if snapshot_time > 0: print(f" Throughput: {db_size_mb / snapshot_time:.2f} MB/s") # Test chunking if database is large enough if db_size_mb > 10: output_dir = tmp_path / "chunked" output_dir.mkdir() chunk_start = time.time() chunked = share.maybe_chunk_database( snapshot_path, output_dir, threshold_bytes=10 * 1024 * 1024, chunk_bytes=5 * 1024 * 1024, ) chunk_time = time.time() - chunk_start print(f" Chunking time: {chunk_time:.3f} seconds") print(f" Was chunked: {chunked is not None}") # Large snapshots should still complete in reasonable time assert snapshot_time < 120.0, "Large snapshot should complete in < 2 minutes" @pytest.mark.benchmark def test_database_compressibility(small_db: Path, tmp_path: Path) -> None: """Test database compressibility for different scenarios. SQLite databases with repetitive content should compress well with gzip/brotli. This is important for static hosting where CDNs typically apply compression. """ import gzip import shutil # Create snapshot snapshot_path = tmp_path / "snapshot.sqlite3" share.create_sqlite_snapshot(small_db, snapshot_path, checkpoint=True) assert snapshot_path.exists() # Measure uncompressed size uncompressed_size = snapshot_path.stat().st_size uncompressed_mb = uncompressed_size / (1024 * 1024) # Compress with gzip compressed_path = tmp_path / "mailbox.sqlite3.gz" with snapshot_path.open("rb") as f_in, gzip.open(compressed_path, "wb", compresslevel=6) as f_out: shutil.copyfileobj(f_in, f_out) compressed_size = compressed_path.stat().st_size compressed_mb = compressed_size / (1024 * 1024) compression_ratio = compressed_size / uncompressed_size print("\nDatabase compression statistics:") print(f" Uncompressed: {uncompressed_mb:.2f} MB") print(f" Compressed (gzip): {compressed_mb:.2f} MB") print(f" Compression ratio: {compression_ratio:.2%}") # Expect at least 30% compression for repetitive test data assert compression_ratio < 0.7, \ f"Database should compress to < 70% of original size, got {compression_ratio:.2%}" @pytest.mark.benchmark def test_chunk_size_validation(large_db: Path, tmp_path: Path) -> None: """Test that chunking produces appropriately sized chunks. httpvfs performance depends on chunk size - too small means many HTTP requests, too large means downloading unnecessary data. """ # Create snapshot first snapshot_path = tmp_path / "snapshot.sqlite3" share.create_sqlite_snapshot(large_db, snapshot_path, checkpoint=True) output_dir = tmp_path / "chunk_test" output_dir.mkdir() # Test chunking with specific chunk size chunk_size_mb = 5 chunked = share.maybe_chunk_database( snapshot_path, output_dir, threshold_bytes=1 * 1024 * 1024, # Force chunking at 1MB chunk_bytes=int(chunk_size_mb * 1024 * 1024), ) print("\nChunk validation:") print(f" Was chunked: {chunked}") if chunked: # Check if config file was created config_path = output_dir / "mailbox.sqlite3.config.json" if config_path.exists(): config = json.loads(config_path.read_text()) total_size = config.get("databaseLength", 0) print(f" Total size: {total_size / (1024 * 1024):.2f} MB") # Validate chunks directory chunks_dir = output_dir / "chunks" if chunks_dir.exists(): chunk_files = sorted(chunks_dir.glob("mailbox.sqlite3*")) print(f" Number of chunks: {len(chunk_files)}") # Check individual chunk sizes for chunk_file in chunk_files: chunk_size = chunk_file.stat().st_size / (1024 * 1024) print(f" {chunk_file.name}: {chunk_size:.2f} MB") # Chunks should not wildly exceed requested size assert chunk_size <= chunk_size_mb * 2, \ f"Chunk {chunk_file.name} too large ({chunk_size:.2f} > {chunk_size_mb * 2})" else: print(" Database not chunked (below threshold)") def test_vacuum_improves_locality() -> None: """Document that VACUUM should be run before export for optimal performance. VACUUM rebuilds the database file, which: 1. Removes fragmentation 2. Improves page locality 3. Reduces file size 4. Optimizes httpvfs streaming performance """ documentation = """ VACUUM Optimization for Export =============================== The export pipeline should run VACUUM before creating snapshots to: 1. Defragment database pages - Consecutive pages minimize HTTP Range requests - Better locality = fewer round trips for httpvfs 2. Reclaim unused space - Reduces bundle size - Faster downloads and cache loading 3. Rebuild indexes - Optimal B-tree structure - Better query performance in viewer 4. Update statistics - SQLite query planner uses current stats - Improves EXPLAIN QUERY PLAN results Implementation: --------------- Before export: ```python conn = sqlite3.connect(db_path) conn.execute("VACUUM") conn.execute("ANALYZE") conn.close() ``` Cost: O(n) where n = database size Benefit: 10-30% size reduction, 2-5x better httpvfs performance """ assert len(documentation) > 100, "VACUUM documentation should be comprehensive" def test_browser_performance_requirements_documentation() -> None: """Document requirements for browser-based performance testing. Full performance validation requires headless browser automation to measure: 1. First meaningful paint 2. OPFS cache performance 3. Warm vs cold load times 4. httpvfs HTTP Range request patterns """ documentation = """ Browser Performance Testing Requirements ========================================= Comprehensive performance validation requires Playwright/Puppeteer tests: 1. Bundle Loading Performance ------------------------------ Test Setup: - Create bundles: 1 MB, 10 MB, 100 MB, 500 MB - Deploy to local static server - Launch headless Chromium with Performance API Metrics to measure: - Time to First Byte (TTFB) - First Contentful Paint (FCP) - First Meaningful Paint (FMP) - when message list appears - Time to Interactive (TTI) - when search/navigation works Target: FMP < 2s for 100MB bundle on fast connection 2. OPFS Cache Performance -------------------------- Test Setup: - Launch browser with COOP/COEP headers (cross-origin isolation) - Verify sqlite-wasm + OPFS is available - Load bundle first time (cold cache) - Reload page (warm cache) Metrics to measure: - Cold load: full download + OPFS write time - Warm load: OPFS read time (should be < 200ms) - Cache hit ratio (via Performance API) Target: Warm load FMP < 500ms for 100MB bundle 3. httpvfs Streaming Performance --------------------------------- Test Setup: - Deploy chunked bundle (10MB chunks) - Monitor Network tab in DevTools - Perform various viewer operations Metrics to measure: - Number of HTTP Range requests for initial load - Bytes downloaded vs database size ratio - Lazy loading behavior (chunks downloaded on demand) Target: < 10 Range requests for initial thread list view 4. Query Performance Under Load -------------------------------- Test Setup: - Load large bundle (500 MB, 50k+ messages) - Perform rapid navigation/search operations - Monitor console for slow queries Metrics to measure: - Thread list render time - Search result latency (FTS vs LIKE) - Message detail load time - Scroll performance (virtual scrolling if implemented) Target: All operations < 100ms after initial load 5. Memory Usage --------------- Test Setup: - Open bundle in browser - Monitor memory usage over time - Navigate through many messages Metrics to measure: - Peak memory usage - Memory leaks (should stay flat after initial load) - OPFS cache storage quota usage Target: < 2x database size memory usage Implementation Tools: --------------------- ```python from playwright.sync_api import sync_playwright def test_bundle_load_performance(): with sync_playwright() as p: browser = p.chromium.launch() page = browser.new_page() # Measure FMP page.goto("http://localhost:8000/viewer/") page.wait_for_selector("#message-list li") # First message visible metrics = page.evaluate("() => performance.getEntriesByType('navigation')[0]") assert metrics["domContentLoadedEventEnd"] < 2000 browser.close() ``` See tests/playwright/ directory for full implementation. """ assert len(documentation) > 500, "Browser performance documentation should be comprehensive" @pytest.mark.benchmark @pytest.mark.parametrize("num_messages", [100, 1000, 5000]) def test_export_scales_linearly(tmp_path: Path, num_messages: int) -> None: """Test that snapshot time scales linearly with database size. Snapshot performance should be O(n) where n = database size. Non-linear scaling would indicate a performance bottleneck. """ # Create database with specified size db_path = _create_test_database(tmp_path, f"scale_{num_messages}.sqlite3", num_messages, 1000) # Measure snapshot time snapshot_path = tmp_path / f"scale_{num_messages}_snapshot.sqlite3" start_time = time.time() share.create_sqlite_snapshot(db_path, snapshot_path, checkpoint=True) snapshot_time = time.time() - start_time # Calculate throughput throughput = num_messages / snapshot_time if snapshot_time > 0 else float('inf') db_size_mb = _get_file_size_mb(snapshot_path) print(f"\nScale test ({num_messages} messages, {db_size_mb:.2f} MB):") print(f" Snapshot time: {snapshot_time:.3f} seconds") print(f" Throughput: {throughput:.0f} messages/second") # Snapshot should handle at least 50 messages/second assert throughput > 50, f"Snapshot throughput too low: {throughput:.0f} msg/s"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Dicklesworthstone/mcp_agent_mail'

If you have feedback or need assistance with the MCP directory API, please join our Discord server