Skip to main content
Glama
benchmark_cas.py14.5 kB
""" Performance benchmarks for Content-Addressable Storage (CAS). """ import pytest import asyncio import time import hashlib import random from pathlib import Path from typing import List, Tuple import statistics from shannon_mcp.storage.cas import ContentAddressableStorage from tests.utils.performance import PerformanceTimer, PerformanceMonitor class BenchmarkCASPerformance: """Benchmark CAS performance.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_cas_write_performance(self, benchmark, temp_dir): """Benchmark CAS write performance.""" cas = ContentAddressableStorage(temp_dir / "cas") await cas.initialize() # Test different content sizes sizes = [ (1, "1KB"), (10, "10KB"), (100, "100KB"), (1024, "1MB"), (10240, "10MB") ] results = {} for size_kb, label in sizes: content_size = size_kb * 1024 content = self._generate_random_content(content_size) # Benchmark writes write_times = [] for i in range(10): # Generate unique content each time test_content = content + f"_{i}".encode() start = time.perf_counter() content_hash = await cas.store(test_content) duration = time.perf_counter() - start write_times.append(duration) avg_time = statistics.mean(write_times) throughput_mb_s = (content_size / (1024 * 1024)) / avg_time results[label] = { "avg_write_time": avg_time, "throughput_MB/s": throughput_mb_s, "operations_per_sec": 1 / avg_time } await cas.close() # Performance assertions assert results["1KB"]["operations_per_sec"] > 1000 # >1000 ops/s for small files assert results["1MB"]["throughput_MB/s"] > 50 # >50 MB/s for 1MB files return results @pytest.mark.benchmark @pytest.mark.asyncio async def test_cas_read_performance(self, benchmark, temp_dir): """Benchmark CAS read performance.""" cas = ContentAddressableStorage(temp_dir / "cas") await cas.initialize() # Pre-store content test_data = [] sizes = [1024, 10240, 102400] # 1KB, 10KB, 100KB for size in sizes: content = self._generate_random_content(size) content_hash = await cas.store(content) test_data.append((content_hash, size)) # Benchmark reads results = {} for content_hash, size in test_data: read_times = [] for _ in range(100): start = time.perf_counter() content = await cas.retrieve(content_hash) duration = time.perf_counter() - start read_times.append(duration) avg_time = statistics.mean(read_times) throughput_mb_s = (size / (1024 * 1024)) / avg_time size_label = f"{size // 1024}KB" results[size_label] = { "avg_read_time": avg_time, "throughput_MB/s": throughput_mb_s, "operations_per_sec": 1 / avg_time } await cas.close() # Performance assertions assert results["1KB"]["operations_per_sec"] > 5000 # >5000 ops/s for small files assert results["100KB"]["throughput_MB/s"] > 100 # >100 MB/s for 100KB files return results @pytest.mark.benchmark @pytest.mark.asyncio async def test_cas_compression_performance(self, benchmark, temp_dir): """Benchmark compression performance.""" cas = ContentAddressableStorage( temp_dir / "cas", compression_enabled=True, compression_level=3 ) await cas.initialize() # Test compressible vs incompressible data test_cases = [ ("highly_compressible", self._generate_compressible_content(1024 * 1024)), ("normal_text", self._generate_text_content(1024 * 1024)), ("random_data", self._generate_random_content(1024 * 1024)) ] results = {} for data_type, content in test_cases: # Benchmark compression start = time.perf_counter() content_hash = await cas.store(content) store_duration = time.perf_counter() - start # Get compression ratio info = await cas.get_info(content_hash) compression_ratio = info["original_size"] / info["compressed_size"] # Benchmark decompression start = time.perf_counter() retrieved = await cas.retrieve(content_hash) retrieve_duration = time.perf_counter() - start results[data_type] = { "store_time": store_duration, "retrieve_time": retrieve_duration, "compression_ratio": compression_ratio, "store_throughput_MB/s": 1.0 / store_duration, "retrieve_throughput_MB/s": 1.0 / retrieve_duration } await cas.close() # Compression should be effective for compressible data assert results["highly_compressible"]["compression_ratio"] > 5.0 assert results["normal_text"]["compression_ratio"] > 1.5 return results @pytest.mark.benchmark @pytest.mark.asyncio async def test_cas_concurrent_operations(self, benchmark, temp_dir): """Benchmark concurrent CAS operations.""" cas = ContentAddressableStorage(temp_dir / "cas") await cas.initialize() # Prepare test data concurrent_counts = [10, 50, 100] content_size = 10240 # 10KB results = {} for count in concurrent_counts: contents = [ self._generate_random_content(content_size) for _ in range(count) ] # Benchmark concurrent writes start = time.perf_counter() write_tasks = [ cas.store(content) for content in contents ] hashes = await asyncio.gather(*write_tasks) write_duration = time.perf_counter() - start # Benchmark concurrent reads start = time.perf_counter() read_tasks = [ cas.retrieve(content_hash) for content_hash in hashes ] retrieved = await asyncio.gather(*read_tasks) read_duration = time.perf_counter() - start results[f"{count}_concurrent"] = { "write_duration": write_duration, "read_duration": read_duration, "write_ops_per_sec": count / write_duration, "read_ops_per_sec": count / read_duration, "write_throughput_MB/s": (count * content_size) / (write_duration * 1024 * 1024), "read_throughput_MB/s": (count * content_size) / (read_duration * 1024 * 1024) } await cas.close() # Should scale well with concurrency assert results["100_concurrent"]["write_ops_per_sec"] > 100 assert results["100_concurrent"]["read_ops_per_sec"] > 500 return results @pytest.mark.benchmark @pytest.mark.asyncio async def test_cas_deduplication_performance(self, benchmark, temp_dir): """Benchmark deduplication performance.""" cas = ContentAddressableStorage(temp_dir / "cas") await cas.initialize() # Create content with duplicates unique_contents = 100 duplicate_factor = 10 content_size = 10240 # 10KB contents = [] for i in range(unique_contents): content = self._generate_random_content(content_size) # Add multiple copies for _ in range(duplicate_factor): contents.append(content) # Shuffle to simulate real-world order random.shuffle(contents) # Benchmark storing with deduplication start = time.perf_counter() for content in contents: await cas.store(content) duration = time.perf_counter() - start # Check storage efficiency stats = await cas.get_stats() expected_size = unique_contents * content_size actual_size = stats["total_size"] space_saved = (len(contents) * content_size) - actual_size dedup_ratio = len(contents) / stats["total_objects"] results = { "total_operations": len(contents), "unique_objects": stats["total_objects"], "duration": duration, "ops_per_sec": len(contents) / duration, "deduplication_ratio": dedup_ratio, "space_saved_MB": space_saved / (1024 * 1024), "storage_efficiency": 1 - (actual_size / (len(contents) * content_size)) } await cas.close() # Deduplication should be effective assert results["deduplication_ratio"] >= duplicate_factor * 0.9 # Allow small variance assert results["storage_efficiency"] > 0.85 return results def _generate_random_content(self, size: int) -> bytes: """Generate random binary content.""" return bytes(random.getrandbits(8) for _ in range(size)) def _generate_compressible_content(self, size: int) -> bytes: """Generate highly compressible content.""" pattern = b"ABCDEFGHIJ" * 100 repetitions = size // len(pattern) return pattern * repetitions + pattern[:size % len(pattern)] def _generate_text_content(self, size: int) -> bytes: """Generate text-like content.""" words = ["the", "quick", "brown", "fox", "jumps", "over", "lazy", "dog", "\n"] text = "" while len(text.encode()) < size: text += " ".join(random.choices(words, k=10)) + "\n" return text.encode()[:size] class BenchmarkCASScalability: """Benchmark CAS scalability.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_cas_large_file_handling(self, benchmark, temp_dir): """Benchmark large file handling.""" cas = ContentAddressableStorage( temp_dir / "cas", compression_enabled=True ) await cas.initialize() # Test progressively larger files file_sizes_mb = [1, 10, 50, 100] results = {} for size_mb in file_sizes_mb: if size_mb > 50: # Skip very large files in CI continue content = self._generate_random_content(size_mb * 1024 * 1024) # Benchmark store start = time.perf_counter() content_hash = await cas.store(content) store_duration = time.perf_counter() - start # Benchmark retrieve start = time.perf_counter() retrieved = await cas.retrieve(content_hash) retrieve_duration = time.perf_counter() - start results[f"{size_mb}MB"] = { "store_duration": store_duration, "retrieve_duration": retrieve_duration, "store_throughput_MB/s": size_mb / store_duration, "retrieve_throughput_MB/s": size_mb / retrieve_duration } await cas.close() # Should maintain reasonable performance for large files if "10MB" in results: assert results["10MB"]["store_throughput_MB/s"] > 10 assert results["10MB"]["retrieve_throughput_MB/s"] > 20 return results @pytest.mark.benchmark @pytest.mark.asyncio async def test_cas_many_objects_performance(self, benchmark, temp_dir): """Benchmark CAS with many objects.""" cas = ContentAddressableStorage(temp_dir / "cas") await cas.initialize() # Store many small objects object_counts = [1000, 5000, 10000] object_size = 1024 # 1KB results = {} for count in object_counts: # Generate unique contents contents = [ f"Object {i} content".encode() + self._generate_random_content(object_size - 20) for i in range(count) ] # Benchmark bulk store start = time.perf_counter() hashes = [] for content in contents: content_hash = await cas.store(content) hashes.append(content_hash) store_duration = time.perf_counter() - start # Benchmark random access random_hashes = random.sample(hashes, min(100, count)) start = time.perf_counter() for content_hash in random_hashes: await cas.retrieve(content_hash) access_duration = time.perf_counter() - start results[f"{count}_objects"] = { "store_duration": store_duration, "store_ops_per_sec": count / store_duration, "random_access_duration": access_duration, "random_access_ops_per_sec": len(random_hashes) / access_duration } await cas.close() # Should scale well with many objects assert results["1000_objects"]["store_ops_per_sec"] > 500 assert results["1000_objects"]["random_access_ops_per_sec"] > 1000 return results def _generate_random_content(self, size: int) -> bytes: """Generate random binary content.""" return bytes(random.getrandbits(8) for _ in range(size))

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server