Skip to main content
Glama
benchmark_hybrid_sync.py9.77 kB
#!/usr/bin/env python3 """ Benchmark hybrid storage sync performance optimizations (v8.27.0). Tests the performance improvements from: - Bulk existence checking (get_all_content_hashes) - Parallel processing with asyncio.gather - Larger batch sizes for initial sync Usage: python scripts/benchmarks/benchmark_hybrid_sync.py """ import asyncio import time import sys from pathlib import Path from typing import List from dataclasses import dataclass # Add src to path sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src")) from mcp_memory_service.storage.sqlite_vec import SQLiteVecStorage from mcp_memory_service.models.memory import Memory from mcp_memory_service import config @dataclass class BenchmarkResult: """Results from a sync benchmark run.""" operation: str duration_ms: float memories_processed: int memories_per_second: float optimization_used: str async def benchmark_bulk_existence_check(): """Benchmark bulk existence check vs individual queries.""" print("\n" + "=" * 80) print("BENCHMARK 1: Bulk Existence Check") print("=" * 80) # Create test storage storage = SQLiteVecStorage(config.SQLITE_VEC_PATH) await storage.initialize() # Get stats stats = await storage.get_stats() total_memories = stats.get('total_memories', 0) print(f"Database contains: {total_memories} memories") print() if total_memories < 100: print("⚠️ Insufficient memories for meaningful benchmark (need 100+)") print(" Run with existing production database for accurate results") return None # Test 1: Individual queries (OLD METHOD - simulated) print("Test 1: Individual hash queries (old method - simulated)") test_count = min(100, total_memories) # Get sample hashes all_memories = await storage.get_all_memories(limit=test_count) test_hashes = [m.content_hash for m in all_memories[:test_count]] start = time.time() for content_hash in test_hashes: exists = await storage.get_by_hash(content_hash) individual_duration = (time.time() - start) * 1000 print(f" Checked {test_count} hashes individually: {individual_duration:.1f}ms") print(f" Average: {individual_duration / test_count:.2f}ms per check") # Test 2: Bulk hash loading (NEW METHOD) print("\nTest 2: Bulk hash loading (new method)") start = time.time() all_hashes = await storage.get_all_content_hashes() bulk_duration = (time.time() - start) * 1000 print(f" Loaded {len(all_hashes)} hashes in bulk: {bulk_duration:.1f}ms") print(f" Average lookup: O(1) constant time") # Calculate improvement speedup = individual_duration / bulk_duration if bulk_duration > 0 else 0 print(f"\n📊 Results:") print(f" Speedup: {speedup:.1f}x faster for {test_count} checks") print(f" For 2,619 memories: {(individual_duration / test_count * 2619):.0f}ms → {bulk_duration:.0f}ms") print(f" Time saved: {((individual_duration / test_count * 2619) - bulk_duration):.0f}ms") return BenchmarkResult( operation="bulk_existence_check", duration_ms=bulk_duration, memories_processed=len(all_hashes), memories_per_second=len(all_hashes) / (bulk_duration / 1000) if bulk_duration > 0 else 0, optimization_used="get_all_content_hashes()" ) async def benchmark_parallel_processing(): """Benchmark parallel vs sequential memory processing.""" print("\n" + "=" * 80) print("BENCHMARK 2: Parallel Processing") print("=" * 80) # Create test storage storage = SQLiteVecStorage(config.SQLITE_VEC_PATH) await storage.initialize() # Create test memories (don't actually store them) test_memories = [] for i in range(50): # Test with 50 memories test_memories.append(Memory( content=f"Benchmark test memory {i} with some content for embedding generation", content_hash=f"test_hash_{i}", tags=["benchmark", "test"], memory_type="test" )) print(f"Testing with {len(test_memories)} memories") print() # Test 1: Sequential processing (OLD METHOD - simulated) print("Test 1: Sequential processing (old method - simulated)") start = time.time() # Simulate sequential hash checks local_hashes = await storage.get_all_content_hashes() for memory in test_memories: # Simulate existence check exists = memory.content_hash in local_hashes sequential_duration = (time.time() - start) * 1000 print(f" Processed {len(test_memories)} memories sequentially: {sequential_duration:.1f}ms") print(f" Average: {sequential_duration / len(test_memories):.2f}ms per memory") # Test 2: Parallel processing (NEW METHOD - simulated) print("\nTest 2: Parallel processing with Semaphore(15)") semaphore = asyncio.Semaphore(15) async def process_memory(memory): async with semaphore: exists = memory.content_hash in local_hashes # Simulate some async work await asyncio.sleep(0.001) return exists start = time.time() tasks = [process_memory(mem) for mem in test_memories] await asyncio.gather(*tasks, return_exceptions=True) parallel_duration = (time.time() - start) * 1000 print(f" Processed {len(test_memories)} memories in parallel: {parallel_duration:.1f}ms") print(f" Concurrency: Up to 15 simultaneous operations") # Calculate improvement speedup = sequential_duration / parallel_duration if parallel_duration > 0 else 0 print(f"\n📊 Results:") print(f" Speedup: {speedup:.1f}x faster") print(f" For 2,619 memories: {(sequential_duration / len(test_memories) * 2619):.0f}ms → {(parallel_duration / len(test_memories) * 2619):.0f}ms") return BenchmarkResult( operation="parallel_processing", duration_ms=parallel_duration, memories_processed=len(test_memories), memories_per_second=len(test_memories) / (parallel_duration / 1000) if parallel_duration > 0 else 0, optimization_used="asyncio.gather() + Semaphore(15)" ) async def benchmark_batch_size(): """Benchmark impact of larger batch sizes on API calls.""" print("\n" + "=" * 80) print("BENCHMARK 3: Batch Size Optimization") print("=" * 80) total_memories = 2619 # Actual sync count from production # Old batch size old_batch_size = 100 old_api_calls = (total_memories + old_batch_size - 1) // old_batch_size # Ceiling division old_overhead_ms = old_api_calls * 50 # Assume 50ms overhead per API call # New batch size new_batch_size = 500 new_api_calls = (total_memories + new_batch_size - 1) // new_batch_size new_overhead_ms = new_api_calls * 50 print(f"Total memories to sync: {total_memories}") print() print(f"Old method (batch_size=100):") print(f" API calls needed: {old_api_calls}") print(f" Network overhead: ~{old_overhead_ms}ms ({old_api_calls} × 50ms)") print(f"\nNew method (batch_size=500):") print(f" API calls needed: {new_api_calls}") print(f" Network overhead: ~{new_overhead_ms}ms ({new_api_calls} × 50ms)") reduction = old_api_calls - new_api_calls time_saved = old_overhead_ms - new_overhead_ms print(f"\n📊 Results:") print(f" API calls reduced: {reduction} fewer calls ({reduction / old_api_calls * 100:.1f}% reduction)") print(f" Time saved: ~{time_saved}ms on network overhead alone") return BenchmarkResult( operation="batch_size_optimization", duration_ms=new_overhead_ms, memories_processed=total_memories, memories_per_second=total_memories / (new_overhead_ms / 1000) if new_overhead_ms > 0 else 0, optimization_used="batch_size=500 (5x larger)" ) async def main(): """Run all benchmarks.""" print("=" * 80) print("HYBRID STORAGE SYNC PERFORMANCE BENCHMARK (v8.27.0)") print("=" * 80) print() print("Testing optimizations:") print(" 1. Bulk existence checking (get_all_content_hashes)") print(" 2. Parallel processing with asyncio.gather") print(" 3. Larger batch sizes (100 → 500)") print() results = [] try: # Run benchmarks result1 = await benchmark_bulk_existence_check() if result1: results.append(result1) result2 = await benchmark_parallel_processing() if result2: results.append(result2) result3 = await benchmark_batch_size() if result3: results.append(result3) # Summary print("\n" + "=" * 80) print("OVERALL PERFORMANCE SUMMARY") print("=" * 80) print("\nOptimization Impact:") for result in results: print(f" • {result.operation}: {result.optimization_used}") print("\nEstimated Combined Speedup:") print(" • Before: ~8 minutes for 2,619 memories (~5.5 mem/sec)") print(" • After: ~1.5-3 minutes estimated (~15-30 mem/sec)") print(" • Overall: 3-5x faster initial sync") print("\nKey Improvements:") print(" ✅ Eliminated 2,619 individual DB queries → single bulk load") print(" ✅ Up to 15x parallelism for CPU/embedding generation") print(" ✅ 5x fewer Cloudflare API calls (6 vs 27)") print("\n" + "=" * 80) print("✅ Benchmark completed successfully") print("=" * 80) return 0 except Exception as e: print(f"\n❌ Benchmark failed: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": sys.exit(asyncio.run(main()))

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server