Skip to main content
Glama
cas.py13.2 kB
"""Content-addressable storage (CAS) implementation""" import hashlib import asyncio import json import zstandard as zstd from pathlib import Path from typing import Optional, Dict, Any, List, Union, Tuple from datetime import datetime import aiofiles import aiofiles.os from dataclasses import dataclass, field from ..utils.logging import get_logger from ..utils.errors import StorageError, ValidationError logger = get_logger(__name__) @dataclass class CASObject: """Object stored in CAS""" hash: str size: int compressed_size: int created_at: datetime metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary""" return { "hash": self.hash, "size": self.size, "compressed_size": self.compressed_size, "created_at": self.created_at.isoformat(), "metadata": self.metadata } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'CASObject': """Create from dictionary""" return cls( hash=data["hash"], size=data["size"], compressed_size=data["compressed_size"], created_at=datetime.fromisoformat(data["created_at"]), metadata=data.get("metadata", {}) ) class ContentAddressableStorage: """Content-addressable storage with zstd compression Stores objects by their content hash (SHA-256) with automatic deduplication and compression. """ def __init__(self, storage_path: Path, compression_level: int = 3): """Initialize CAS Args: storage_path: Base path for storage compression_level: Zstd compression level (1-22, default 3) """ self.storage_path = Path(storage_path) self.objects_path = self.storage_path / "objects" self.index_path = self.storage_path / "index.json" self.compression_level = compression_level # In-memory index self._index: Dict[str, CASObject] = {} self._index_lock = asyncio.Lock() # Compression context self._compressor = zstd.ZstdCompressor(level=compression_level) self._decompressor = zstd.ZstdDecompressor() # Statistics self._stats = { "objects_stored": 0, "total_size": 0, "compressed_size": 0, "dedup_hits": 0, "compression_ratio": 0.0 } async def initialize(self) -> None: """Initialize storage""" # Create directories await aiofiles.os.makedirs(self.objects_path, exist_ok=True) # Load index await self._load_index() logger.info( "cas_initialized", path=str(self.storage_path), objects=len(self._index), compression_level=self.compression_level ) async def store(self, data: bytes, metadata: Optional[Dict[str, Any]] = None) -> str: """Store data in CAS Args: data: Raw data to store metadata: Optional metadata Returns: Content hash (SHA-256) """ # Calculate hash content_hash = hashlib.sha256(data).hexdigest() # Check if already exists async with self._index_lock: if content_hash in self._index: self._stats["dedup_hits"] += 1 logger.debug(f"CAS dedup hit: {content_hash}") return content_hash # Compress data compressed_data = self._compressor.compress(data) # Get object path (use first 2 chars as directory for sharding) object_dir = self.objects_path / content_hash[:2] object_path = object_dir / content_hash[2:] # Create directory await aiofiles.os.makedirs(object_dir, exist_ok=True) # Write compressed data async with aiofiles.open(object_path, 'wb') as f: await f.write(compressed_data) # Create CAS object cas_object = CASObject( hash=content_hash, size=len(data), compressed_size=len(compressed_data), created_at=datetime.utcnow(), metadata=metadata or {} ) # Update index async with self._index_lock: self._index[content_hash] = cas_object await self._save_index() # Update stats self._stats["objects_stored"] += 1 self._stats["total_size"] += len(data) self._stats["compressed_size"] += len(compressed_data) self._update_compression_ratio() logger.debug( "cas_object_stored", hash=content_hash, size=len(data), compressed_size=len(compressed_data), compression_ratio=len(compressed_data) / len(data) ) return content_hash async def retrieve(self, content_hash: str) -> Optional[bytes]: """Retrieve data from CAS Args: content_hash: Content hash to retrieve Returns: Raw data or None if not found """ # Check index async with self._index_lock: if content_hash not in self._index: return None # Get object path object_path = self.objects_path / content_hash[:2] / content_hash[2:] # Check if file exists if not await aiofiles.os.path.exists(object_path): logger.warning(f"CAS object file missing: {content_hash}") # Remove from index async with self._index_lock: self._index.pop(content_hash, None) await self._save_index() return None # Read compressed data async with aiofiles.open(object_path, 'rb') as f: compressed_data = await f.read() # Decompress try: data = self._decompressor.decompress(compressed_data) # Verify hash actual_hash = hashlib.sha256(data).hexdigest() if actual_hash != content_hash: raise StorageError(f"Hash mismatch: expected {content_hash}, got {actual_hash}") return data except Exception as e: logger.error(f"Failed to decompress CAS object {content_hash}: {e}") raise StorageError(f"Failed to retrieve object: {e}") async def exists(self, content_hash: str) -> bool: """Check if object exists""" async with self._index_lock: return content_hash in self._index async def get_object(self, content_hash: str) -> Optional[CASObject]: """Get CAS object metadata""" async with self._index_lock: return self._index.get(content_hash) async def delete(self, content_hash: str) -> bool: """Delete object from CAS Args: content_hash: Content hash to delete Returns: True if deleted, False if not found """ async with self._index_lock: cas_object = self._index.pop(content_hash, None) if not cas_object: return False # Delete file object_path = self.objects_path / content_hash[:2] / content_hash[2:] try: await aiofiles.os.remove(object_path) # Try to remove empty directory try: await aiofiles.os.rmdir(object_path.parent) except: pass # Directory not empty except Exception as e: logger.error(f"Failed to delete CAS object file {content_hash}: {e}") # Update index async with self._index_lock: await self._save_index() # Update stats self._stats["objects_stored"] -= 1 self._stats["total_size"] -= cas_object.size self._stats["compressed_size"] -= cas_object.compressed_size self._update_compression_ratio() logger.debug(f"CAS object deleted: {content_hash}") return True async def list_objects(self, prefix: Optional[str] = None) -> List[CASObject]: """List all objects or objects with hash prefix""" async with self._index_lock: if prefix: return [ obj for hash, obj in self._index.items() if hash.startswith(prefix) ] else: return list(self._index.values()) async def gc(self, keep_hashes: Optional[List[str]] = None) -> Tuple[int, int]: """Garbage collect unreferenced objects Args: keep_hashes: List of hashes to keep (if None, keeps all) Returns: Tuple of (objects_removed, bytes_freed) """ if keep_hashes is None: return 0, 0 keep_set = set(keep_hashes) objects_removed = 0 bytes_freed = 0 # Find objects to remove async with self._index_lock: to_remove = [ hash for hash in self._index.keys() if hash not in keep_set ] # Remove objects for content_hash in to_remove: cas_object = await self.get_object(content_hash) if cas_object and await self.delete(content_hash): objects_removed += 1 bytes_freed += cas_object.compressed_size logger.info( "cas_gc_completed", objects_removed=objects_removed, bytes_freed=bytes_freed ) return objects_removed, bytes_freed async def verify_integrity(self) -> List[str]: """Verify integrity of all objects Returns: List of corrupted object hashes """ corrupted = [] async with self._index_lock: hashes = list(self._index.keys()) for content_hash in hashes: try: data = await self.retrieve(content_hash) if not data: corrupted.append(content_hash) except Exception as e: logger.error(f"Integrity check failed for {content_hash}: {e}") corrupted.append(content_hash) if corrupted: logger.warning( "cas_integrity_check_failed", corrupted_count=len(corrupted), total_count=len(hashes) ) else: logger.info( "cas_integrity_check_passed", total_count=len(hashes) ) return corrupted def get_stats(self) -> Dict[str, Any]: """Get storage statistics""" return { **self._stats, "object_count": len(self._index) } async def _load_index(self) -> None: """Load index from disk""" if not await aiofiles.os.path.exists(self.index_path): return try: async with aiofiles.open(self.index_path, 'r') as f: data = await f.read() index_data = json.loads(data) self._index = { hash: CASObject.from_dict(obj_data) for hash, obj_data in index_data.items() } # Recalculate stats self._stats["objects_stored"] = len(self._index) self._stats["total_size"] = sum(obj.size for obj in self._index.values()) self._stats["compressed_size"] = sum(obj.compressed_size for obj in self._index.values()) self._update_compression_ratio() except Exception as e: logger.error(f"Failed to load CAS index: {e}") self._index = {} async def _save_index(self) -> None: """Save index to disk""" index_data = { hash: obj.to_dict() for hash, obj in self._index.items() } # Write to temporary file first temp_path = self.index_path.with_suffix('.tmp') async with aiofiles.open(temp_path, 'w') as f: await f.write(json.dumps(index_data, indent=2)) # Atomic rename await aiofiles.os.rename(temp_path, self.index_path) def _update_compression_ratio(self) -> None: """Update compression ratio statistic""" if self._stats["total_size"] > 0: self._stats["compression_ratio"] = ( self._stats["compressed_size"] / self._stats["total_size"] ) else: self._stats["compression_ratio"] = 0.0

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server