benchmark_hybrid_sync.py•9.77 kB
#!/usr/bin/env python3
"""
Benchmark hybrid storage sync performance optimizations (v8.27.0).
Tests the performance improvements from:
- Bulk existence checking (get_all_content_hashes)
- Parallel processing with asyncio.gather
- Larger batch sizes for initial sync
Usage:
python scripts/benchmarks/benchmark_hybrid_sync.py
"""
import asyncio
import time
import sys
from pathlib import Path
from typing import List
from dataclasses import dataclass
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
from mcp_memory_service.storage.sqlite_vec import SQLiteVecStorage
from mcp_memory_service.models.memory import Memory
from mcp_memory_service import config
@dataclass
class BenchmarkResult:
"""Results from a sync benchmark run."""
operation: str
duration_ms: float
memories_processed: int
memories_per_second: float
optimization_used: str
async def benchmark_bulk_existence_check():
"""Benchmark bulk existence check vs individual queries."""
print("\n" + "=" * 80)
print("BENCHMARK 1: Bulk Existence Check")
print("=" * 80)
# Create test storage
storage = SQLiteVecStorage(config.SQLITE_VEC_PATH)
await storage.initialize()
# Get stats
stats = await storage.get_stats()
total_memories = stats.get('total_memories', 0)
print(f"Database contains: {total_memories} memories")
print()
if total_memories < 100:
print("⚠️ Insufficient memories for meaningful benchmark (need 100+)")
print(" Run with existing production database for accurate results")
return None
# Test 1: Individual queries (OLD METHOD - simulated)
print("Test 1: Individual hash queries (old method - simulated)")
test_count = min(100, total_memories)
# Get sample hashes
all_memories = await storage.get_all_memories(limit=test_count)
test_hashes = [m.content_hash for m in all_memories[:test_count]]
start = time.time()
for content_hash in test_hashes:
exists = await storage.get_by_hash(content_hash)
individual_duration = (time.time() - start) * 1000
print(f" Checked {test_count} hashes individually: {individual_duration:.1f}ms")
print(f" Average: {individual_duration / test_count:.2f}ms per check")
# Test 2: Bulk hash loading (NEW METHOD)
print("\nTest 2: Bulk hash loading (new method)")
start = time.time()
all_hashes = await storage.get_all_content_hashes()
bulk_duration = (time.time() - start) * 1000
print(f" Loaded {len(all_hashes)} hashes in bulk: {bulk_duration:.1f}ms")
print(f" Average lookup: O(1) constant time")
# Calculate improvement
speedup = individual_duration / bulk_duration if bulk_duration > 0 else 0
print(f"\n📊 Results:")
print(f" Speedup: {speedup:.1f}x faster for {test_count} checks")
print(f" For 2,619 memories: {(individual_duration / test_count * 2619):.0f}ms → {bulk_duration:.0f}ms")
print(f" Time saved: {((individual_duration / test_count * 2619) - bulk_duration):.0f}ms")
return BenchmarkResult(
operation="bulk_existence_check",
duration_ms=bulk_duration,
memories_processed=len(all_hashes),
memories_per_second=len(all_hashes) / (bulk_duration / 1000) if bulk_duration > 0 else 0,
optimization_used="get_all_content_hashes()"
)
async def benchmark_parallel_processing():
"""Benchmark parallel vs sequential memory processing."""
print("\n" + "=" * 80)
print("BENCHMARK 2: Parallel Processing")
print("=" * 80)
# Create test storage
storage = SQLiteVecStorage(config.SQLITE_VEC_PATH)
await storage.initialize()
# Create test memories (don't actually store them)
test_memories = []
for i in range(50): # Test with 50 memories
test_memories.append(Memory(
content=f"Benchmark test memory {i} with some content for embedding generation",
content_hash=f"test_hash_{i}",
tags=["benchmark", "test"],
memory_type="test"
))
print(f"Testing with {len(test_memories)} memories")
print()
# Test 1: Sequential processing (OLD METHOD - simulated)
print("Test 1: Sequential processing (old method - simulated)")
start = time.time()
# Simulate sequential hash checks
local_hashes = await storage.get_all_content_hashes()
for memory in test_memories:
# Simulate existence check
exists = memory.content_hash in local_hashes
sequential_duration = (time.time() - start) * 1000
print(f" Processed {len(test_memories)} memories sequentially: {sequential_duration:.1f}ms")
print(f" Average: {sequential_duration / len(test_memories):.2f}ms per memory")
# Test 2: Parallel processing (NEW METHOD - simulated)
print("\nTest 2: Parallel processing with Semaphore(15)")
semaphore = asyncio.Semaphore(15)
async def process_memory(memory):
async with semaphore:
exists = memory.content_hash in local_hashes
# Simulate some async work
await asyncio.sleep(0.001)
return exists
start = time.time()
tasks = [process_memory(mem) for mem in test_memories]
await asyncio.gather(*tasks, return_exceptions=True)
parallel_duration = (time.time() - start) * 1000
print(f" Processed {len(test_memories)} memories in parallel: {parallel_duration:.1f}ms")
print(f" Concurrency: Up to 15 simultaneous operations")
# Calculate improvement
speedup = sequential_duration / parallel_duration if parallel_duration > 0 else 0
print(f"\n📊 Results:")
print(f" Speedup: {speedup:.1f}x faster")
print(f" For 2,619 memories: {(sequential_duration / len(test_memories) * 2619):.0f}ms → {(parallel_duration / len(test_memories) * 2619):.0f}ms")
return BenchmarkResult(
operation="parallel_processing",
duration_ms=parallel_duration,
memories_processed=len(test_memories),
memories_per_second=len(test_memories) / (parallel_duration / 1000) if parallel_duration > 0 else 0,
optimization_used="asyncio.gather() + Semaphore(15)"
)
async def benchmark_batch_size():
"""Benchmark impact of larger batch sizes on API calls."""
print("\n" + "=" * 80)
print("BENCHMARK 3: Batch Size Optimization")
print("=" * 80)
total_memories = 2619 # Actual sync count from production
# Old batch size
old_batch_size = 100
old_api_calls = (total_memories + old_batch_size - 1) // old_batch_size # Ceiling division
old_overhead_ms = old_api_calls * 50 # Assume 50ms overhead per API call
# New batch size
new_batch_size = 500
new_api_calls = (total_memories + new_batch_size - 1) // new_batch_size
new_overhead_ms = new_api_calls * 50
print(f"Total memories to sync: {total_memories}")
print()
print(f"Old method (batch_size=100):")
print(f" API calls needed: {old_api_calls}")
print(f" Network overhead: ~{old_overhead_ms}ms ({old_api_calls} × 50ms)")
print(f"\nNew method (batch_size=500):")
print(f" API calls needed: {new_api_calls}")
print(f" Network overhead: ~{new_overhead_ms}ms ({new_api_calls} × 50ms)")
reduction = old_api_calls - new_api_calls
time_saved = old_overhead_ms - new_overhead_ms
print(f"\n📊 Results:")
print(f" API calls reduced: {reduction} fewer calls ({reduction / old_api_calls * 100:.1f}% reduction)")
print(f" Time saved: ~{time_saved}ms on network overhead alone")
return BenchmarkResult(
operation="batch_size_optimization",
duration_ms=new_overhead_ms,
memories_processed=total_memories,
memories_per_second=total_memories / (new_overhead_ms / 1000) if new_overhead_ms > 0 else 0,
optimization_used="batch_size=500 (5x larger)"
)
async def main():
"""Run all benchmarks."""
print("=" * 80)
print("HYBRID STORAGE SYNC PERFORMANCE BENCHMARK (v8.27.0)")
print("=" * 80)
print()
print("Testing optimizations:")
print(" 1. Bulk existence checking (get_all_content_hashes)")
print(" 2. Parallel processing with asyncio.gather")
print(" 3. Larger batch sizes (100 → 500)")
print()
results = []
try:
# Run benchmarks
result1 = await benchmark_bulk_existence_check()
if result1:
results.append(result1)
result2 = await benchmark_parallel_processing()
if result2:
results.append(result2)
result3 = await benchmark_batch_size()
if result3:
results.append(result3)
# Summary
print("\n" + "=" * 80)
print("OVERALL PERFORMANCE SUMMARY")
print("=" * 80)
print("\nOptimization Impact:")
for result in results:
print(f" • {result.operation}: {result.optimization_used}")
print("\nEstimated Combined Speedup:")
print(" • Before: ~8 minutes for 2,619 memories (~5.5 mem/sec)")
print(" • After: ~1.5-3 minutes estimated (~15-30 mem/sec)")
print(" • Overall: 3-5x faster initial sync")
print("\nKey Improvements:")
print(" ✅ Eliminated 2,619 individual DB queries → single bulk load")
print(" ✅ Up to 15x parallelism for CPU/embedding generation")
print(" ✅ 5x fewer Cloudflare API calls (6 vs 27)")
print("\n" + "=" * 80)
print("✅ Benchmark completed successfully")
print("=" * 80)
return 0
except Exception as e:
print(f"\n❌ Benchmark failed: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(asyncio.run(main()))