Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
persistent.rs37.3 kB
use async_trait::async_trait; use codegraph_core::{CodeGraphError, CodeNode, NodeId, Result, VectorStore}; use parking_lot::{Mutex, RwLock}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs::{File, OpenOptions}; use std::io::{Read, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::fs; use tracing::{debug, error, info, warn}; /// Header for the memory-mapped vector storage file #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StorageHeader { /// Version of the storage format pub version: u32, /// Dimension of vectors pub dimension: usize, /// Total number of vectors stored pub vector_count: u64, /// Offset to the vector data section pub vectors_offset: u64, /// Offset to the metadata section pub metadata_offset: u64, /// Offset to the index mapping section pub index_mapping_offset: u64, /// Timestamp of last modification pub last_modified: u64, /// Checksum for integrity verification pub checksum: u64, /// Compression type used pub compression_type: CompressionType, /// Whether incremental updates are enabled pub incremental_enabled: bool, } impl Default for StorageHeader { fn default() -> Self { Self { version: 1, dimension: 0, vector_count: 0, vectors_offset: 0, metadata_offset: 0, index_mapping_offset: 0, last_modified: 0, checksum: 0, compression_type: CompressionType::None, incremental_enabled: true, } } } /// Compression techniques for vector storage #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum CompressionType { /// No compression None, /// Product Quantization ProductQuantization { /// Number of subquantizers m: usize, /// Number of bits per subquantizer nbits: u32, }, /// Scalar Quantization ScalarQuantization { /// Number of bits per scalar nbits: u32, /// Use uniform quantization uniform: bool, }, } /// Metadata for a stored vector #[derive(Debug, Clone, Serialize, Deserialize)] pub struct VectorMetadata { /// Original node ID pub node_id: NodeId, /// Internal vector ID pub vector_id: u64, /// Timestamp when stored pub timestamp: u64, /// Original vector norm (for reconstruction) pub norm: f32, /// Whether this vector is compressed pub compressed: bool, /// Size of the compressed vector in bytes pub compressed_size: usize, } /// Incremental update log entry #[derive(Debug, Clone, Serialize, Deserialize)] pub struct UpdateLogEntry { /// Type of operation pub operation: UpdateOperation, /// Node ID affected pub node_id: NodeId, /// Vector ID (for updates/deletes) pub vector_id: Option<u64>, /// Timestamp of operation pub timestamp: u64, /// Vector data (for inserts/updates) pub vector_data: Option<Vec<f32>>, } #[derive(Debug, Clone, Serialize, Deserialize)] pub enum UpdateOperation { Insert, Update, Delete, } /// Product Quantizer for vector compression #[derive(Debug, Clone)] pub struct ProductQuantizer { /// Number of subquantizers m: usize, /// Dimension of each subquantizer dsub: usize, /// Number of bits per subquantizer nbits: u32, /// Number of centroids per subquantizer ksub: usize, /// Centroids for each subquantizer [m][ksub][dsub] centroids: Vec<Vec<Vec<f32>>>, /// Whether the quantizer is trained trained: bool, } impl ProductQuantizer { pub fn new(dimension: usize, m: usize, nbits: u32) -> Result<Self> { if dimension % m != 0 { return Err(CodeGraphError::Vector( "Dimension must be divisible by number of subquantizers".to_string(), )); } let dsub = dimension / m; let ksub = 1 << nbits; Ok(Self { m, dsub, nbits, ksub, centroids: vec![vec![vec![0.0; dsub]; ksub]; m], trained: false, }) } /// Train the quantizer on a set of vectors pub fn train(&mut self, vectors: &[Vec<f32>]) -> Result<()> { if vectors.is_empty() { return Err(CodeGraphError::Vector( "Cannot train on empty vector set".to_string(), )); } let dimension = vectors[0].len(); if dimension != self.m * self.dsub { return Err(CodeGraphError::Vector( "Vector dimension mismatch".to_string(), )); } // Train each subquantizer independently using k-means for sub_idx in 0..self.m { let start_dim = sub_idx * self.dsub; let end_dim = start_dim + self.dsub; // Extract subvectors for this subquantizer let subvectors: Vec<Vec<f32>> = vectors .iter() .map(|v| v[start_dim..end_dim].to_vec()) .collect(); // Run k-means clustering self.centroids[sub_idx] = self.kmeans_clustering(&subvectors, self.ksub)?; } self.trained = true; debug!("Product quantizer trained with {} subquantizers", self.m); Ok(()) } /// Encode a vector using product quantization pub fn encode(&self, vector: &[f32]) -> Result<Vec<u8>> { if !self.trained { return Err(CodeGraphError::Vector("Quantizer not trained".to_string())); } let mut codes = Vec::with_capacity(self.m); for sub_idx in 0..self.m { let start_dim = sub_idx * self.dsub; let end_dim = start_dim + self.dsub; let subvector = &vector[start_dim..end_dim]; // Find nearest centroid let mut best_idx = 0; let mut best_dist = f32::INFINITY; for (centroid_idx, centroid) in self.centroids[sub_idx].iter().enumerate() { let dist = self.euclidean_distance(subvector, centroid); if dist < best_dist { best_dist = dist; best_idx = centroid_idx; } } codes.push(best_idx as u8); } Ok(codes) } /// Decode a vector from its quantized representation pub fn decode(&self, codes: &[u8]) -> Result<Vec<f32>> { if !self.trained { return Err(CodeGraphError::Vector("Quantizer not trained".to_string())); } if codes.len() != self.m { return Err(CodeGraphError::Vector("Invalid code length".to_string())); } let mut decoded = Vec::with_capacity(self.m * self.dsub); for (sub_idx, &code) in codes.iter().enumerate() { let centroid_idx = code as usize; if centroid_idx >= self.ksub { return Err(CodeGraphError::Vector("Invalid centroid index".to_string())); } decoded.extend_from_slice(&self.centroids[sub_idx][centroid_idx]); } Ok(decoded) } /// Simple k-means clustering implementation fn kmeans_clustering(&self, vectors: &[Vec<f32>], k: usize) -> Result<Vec<Vec<f32>>> { if vectors.is_empty() || k == 0 { return Err(CodeGraphError::Vector( "Invalid clustering parameters".to_string(), )); } let dimension = vectors[0].len(); let mut centroids = Vec::with_capacity(k); // Initialize centroids randomly from input vectors for i in 0..k { let idx = i % vectors.len(); centroids.push(vectors[idx].clone()); } // Run k-means iterations for _iteration in 0..50 { // Maximum 50 iterations let mut assignments = vec![0; vectors.len()]; let mut changed = false; // Assign vectors to nearest centroids for (vec_idx, vector) in vectors.iter().enumerate() { let mut best_centroid = 0; let mut best_dist = f32::INFINITY; for (centroid_idx, centroid) in centroids.iter().enumerate() { let dist = self.euclidean_distance(vector, centroid); if dist < best_dist { best_dist = dist; best_centroid = centroid_idx; } } if assignments[vec_idx] != best_centroid { changed = true; } assignments[vec_idx] = best_centroid; } // Update centroids for centroid_idx in 0..k { let assigned_vectors: Vec<&Vec<f32>> = vectors .iter() .enumerate() .filter(|(idx, _)| assignments[*idx] == centroid_idx) .map(|(_, vec)| vec) .collect(); if !assigned_vectors.is_empty() { let mut new_centroid = vec![0.0; dimension]; for vector in assigned_vectors.iter() { for (i, &val) in vector.iter().enumerate() { new_centroid[i] += val; } } let count = assigned_vectors.len() as f32; for val in new_centroid.iter_mut() { *val /= count; } centroids[centroid_idx] = new_centroid; } } if !changed { break; } } Ok(centroids) } fn euclidean_distance(&self, a: &[f32], b: &[f32]) -> f32 { a.iter() .zip(b.iter()) .map(|(x, y)| (x - y).powi(2)) .sum::<f32>() .sqrt() } } /// Scalar Quantizer for vector compression #[derive(Debug, Clone)] pub struct ScalarQuantizer { /// Number of bits per scalar nbits: u32, /// Quantization parameters per dimension scales: Vec<f32>, /// Bias parameters per dimension biases: Vec<f32>, /// Whether to use uniform quantization uniform: bool, /// Whether the quantizer is trained trained: bool, } impl ScalarQuantizer { pub fn new(dimension: usize, nbits: u32, uniform: bool) -> Self { Self { nbits, scales: vec![1.0; dimension], biases: vec![0.0; dimension], uniform, trained: false, } } pub fn train(&mut self, vectors: &[Vec<f32>]) -> Result<()> { if vectors.is_empty() { return Err(CodeGraphError::Vector( "Cannot train on empty vector set".to_string(), )); } let dimension = vectors[0].len(); self.scales = vec![1.0; dimension]; self.biases = vec![0.0; dimension]; if self.uniform { // Uniform quantization: find global min/max let mut global_min = f32::INFINITY; let mut global_max = f32::NEG_INFINITY; for vector in vectors { for &val in vector { global_min = global_min.min(val); global_max = global_max.max(val); } } let range = global_max - global_min; let scale = (1 << self.nbits) as f32 / range; for i in 0..dimension { self.scales[i] = scale; self.biases[i] = global_min; } } else { // Non-uniform quantization: per-dimension min/max for dim in 0..dimension { let mut dim_min = f32::INFINITY; let mut dim_max = f32::NEG_INFINITY; for vector in vectors { let val = vector[dim]; dim_min = dim_min.min(val); dim_max = dim_max.max(val); } let range = dim_max - dim_min; if range > 0.0 { self.scales[dim] = (1 << self.nbits) as f32 / range; self.biases[dim] = dim_min; } } } self.trained = true; debug!("Scalar quantizer trained with {} bits", self.nbits); Ok(()) } pub fn encode(&self, vector: &[f32]) -> Result<Vec<u8>> { if !self.trained { return Err(CodeGraphError::Vector("Quantizer not trained".to_string())); } let max_val = (1 << self.nbits) - 1; let mut encoded = Vec::with_capacity(vector.len() * ((self.nbits + 7) / 8) as usize); for (i, &val) in vector.iter().enumerate() { let normalized = (val - self.biases[i]) * self.scales[i]; let quantized = (normalized.max(0.0).min(max_val as f32)) as u32; // Pack bits efficiently match self.nbits { 8 => encoded.push(quantized as u8), 16 => { encoded.extend_from_slice(&(quantized as u16).to_le_bytes()); } _ => { // For other bit sizes, use u32 encoded.extend_from_slice(&quantized.to_le_bytes()); } } } Ok(encoded) } pub fn decode(&self, encoded: &[u8]) -> Result<Vec<f32>> { if !self.trained { return Err(CodeGraphError::Vector("Quantizer not trained".to_string())); } let dimension = self.scales.len(); let mut decoded = Vec::with_capacity(dimension); let bytes_per_val = match self.nbits { 8 => 1, 16 => 2, _ => 4, }; for i in 0..dimension { let start_idx = i * bytes_per_val; if start_idx + bytes_per_val > encoded.len() { return Err(CodeGraphError::Vector( "Insufficient encoded data".to_string(), )); } let quantized = match self.nbits { 8 => encoded[start_idx] as u32, 16 => u16::from_le_bytes([encoded[start_idx], encoded[start_idx + 1]]) as u32, _ => u32::from_le_bytes([ encoded[start_idx], encoded[start_idx + 1], encoded[start_idx + 2], encoded[start_idx + 3], ]), }; let normalized = quantized as f32 / self.scales[i] + self.biases[i]; decoded.push(normalized); } Ok(decoded) } } /// Persistent vector storage with memory mapping and compression pub struct PersistentVectorStore { /// Storage file path storage_path: PathBuf, /// Backup directory path backup_path: PathBuf, /// Update log path log_path: PathBuf, /// Storage header header: Arc<RwLock<StorageHeader>>, /// Vector metadata mapping metadata: Arc<RwLock<HashMap<NodeId, VectorMetadata>>>, /// Reverse mapping from vector ID to node ID vector_id_mapping: Arc<RwLock<HashMap<u64, NodeId>>>, /// Update log for incremental changes update_log: Arc<Mutex<Vec<UpdateLogEntry>>>, /// Product quantizer for compression pq_quantizer: Arc<RwLock<Option<ProductQuantizer>>>, /// Scalar quantizer for compression sq_quantizer: Arc<RwLock<Option<ScalarQuantizer>>>, /// Next vector ID next_vector_id: Arc<RwLock<u64>>, /// Memory-mapped file handle storage_file: Arc<Mutex<Option<File>>>, } impl PersistentVectorStore { pub fn new<P: AsRef<Path>>(storage_path: P, backup_path: P, dimension: usize) -> Result<Self> { let storage_path = storage_path.as_ref().to_path_buf(); let backup_path = backup_path.as_ref().to_path_buf(); let log_path = storage_path.with_extension("log"); // Ensure directories exist if let Some(parent) = storage_path.parent() { std::fs::create_dir_all(parent)?; } std::fs::create_dir_all(&backup_path)?; let mut header = StorageHeader::default(); header.dimension = dimension; let store = Self { storage_path, backup_path, log_path, header: Arc::new(RwLock::new(header)), metadata: Arc::new(RwLock::new(HashMap::new())), vector_id_mapping: Arc::new(RwLock::new(HashMap::new())), update_log: Arc::new(Mutex::new(Vec::new())), pq_quantizer: Arc::new(RwLock::new(None)), sq_quantizer: Arc::new(RwLock::new(None)), next_vector_id: Arc::new(RwLock::new(0)), storage_file: Arc::new(Mutex::new(None)), }; // Try to load existing storage if store.storage_path.exists() { if let Err(e) = store.load_from_disk() { warn!("Failed to load existing storage, creating new: {}", e); store.initialize_storage()?; } } else { store.initialize_storage()?; } Ok(store) } /// Initialize empty storage file fn initialize_storage(&self) -> Result<()> { let mut file = OpenOptions::new() .create(true) .write(true) .truncate(true) .open(&self.storage_path)?; // Write header let header = self.header.read(); let header_bytes = bincode::serialize(&*header).map_err(|e| CodeGraphError::Vector(e.to_string()))?; file.write_all(&(header_bytes.len() as u64).to_le_bytes())?; file.write_all(&header_bytes)?; file.flush()?; info!( "Initialized persistent vector storage at {:?}", self.storage_path ); Ok(()) } /// Load storage from disk fn load_from_disk(&self) -> Result<()> { let mut file = File::open(&self.storage_path)?; // Read header size let mut header_size_bytes = [0u8; 8]; file.read_exact(&mut header_size_bytes)?; let header_size = u64::from_le_bytes(header_size_bytes); // Read header let mut header_bytes = vec![0u8; header_size as usize]; file.read_exact(&mut header_bytes)?; let loaded_header: StorageHeader = bincode::deserialize(&header_bytes) .map_err(|e| CodeGraphError::Vector(e.to_string()))?; // Verify header integrity if loaded_header.version != 1 { return Err(CodeGraphError::Vector( "Unsupported storage version".to_string(), )); } *self.header.write() = loaded_header.clone(); *self.next_vector_id.write() = loaded_header.vector_count; // Load metadata if available if loaded_header.metadata_offset > 0 { file.seek(SeekFrom::Start(loaded_header.metadata_offset))?; let mut metadata_size_bytes = [0u8; 8]; file.read_exact(&mut metadata_size_bytes)?; let metadata_size = u64::from_le_bytes(metadata_size_bytes); if metadata_size > 0 { let mut metadata_bytes = vec![0u8; metadata_size as usize]; file.read_exact(&mut metadata_bytes)?; let loaded_metadata: HashMap<NodeId, VectorMetadata> = bincode::deserialize(&metadata_bytes) .map_err(|e| CodeGraphError::Vector(e.to_string()))?; // Build reverse mapping let mut vector_id_mapping = HashMap::new(); for (node_id, metadata) in &loaded_metadata { vector_id_mapping.insert(metadata.vector_id, *node_id); } *self.metadata.write() = loaded_metadata; *self.vector_id_mapping.write() = vector_id_mapping; } } // Load update log if exists if self.log_path.exists() { if let Ok(log_data) = std::fs::read(&self.log_path) { if let Ok(log_entries) = bincode::deserialize::<Vec<UpdateLogEntry>>(&log_data) { *self.update_log.lock() = log_entries; } } } info!( "Loaded persistent vector storage with {} vectors", loaded_header.vector_count ); Ok(()) } /// Save storage to disk async fn save_to_disk(&self) -> Result<()> { let temp_path = self.storage_path.with_extension("tmp"); { let mut file = OpenOptions::new() .create(true) .write(true) .truncate(true) .open(&temp_path)?; let mut header = self.header.write(); header.last_modified = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); // Calculate offsets let header_bytes = bincode::serialize(&*header).map_err(|e| CodeGraphError::Vector(e.to_string()))?; let header_section_size = 8 + header_bytes.len() as u64; header.metadata_offset = header_section_size; // Write header file.write_all(&(header_bytes.len() as u64).to_le_bytes())?; file.write_all( &bincode::serialize(&*header).map_err(|e| CodeGraphError::Vector(e.to_string()))?, )?; // Write metadata let metadata = self.metadata.read(); let metadata_bytes = bincode::serialize(&*metadata) .map_err(|e| CodeGraphError::Vector(e.to_string()))?; file.write_all(&(metadata_bytes.len() as u64).to_le_bytes())?; file.write_all(&metadata_bytes)?; file.flush()?; } // Atomic replace fs::rename(&temp_path, &self.storage_path).await?; // Save update log let log_entries = { let log = self.update_log.lock(); log.clone() }; if !log_entries.is_empty() { let log_bytes = bincode::serialize(&log_entries) .map_err(|e| CodeGraphError::Vector(e.to_string()))?; fs::write(&self.log_path, log_bytes).await?; } info!("Saved persistent vector storage"); Ok(()) } /// Enable product quantization compression pub fn enable_product_quantization(&self, m: usize, nbits: u32) -> Result<()> { let header = self.header.read(); let pq = ProductQuantizer::new(header.dimension, m, nbits)?; *self.pq_quantizer.write() = Some(pq); info!("Enabled product quantization with m={}, nbits={}", m, nbits); Ok(()) } /// Enable scalar quantization compression pub fn enable_scalar_quantization(&self, nbits: u32, uniform: bool) -> Result<()> { let header = self.header.read(); let sq = ScalarQuantizer::new(header.dimension, nbits, uniform); *self.sq_quantizer.write() = Some(sq); info!( "Enabled scalar quantization with nbits={}, uniform={}", nbits, uniform ); Ok(()) } /// Create a backup of the current storage pub async fn create_backup(&self) -> Result<PathBuf> { let timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); let backup_file = self.backup_path.join(format!("backup_{}.db", timestamp)); fs::copy(&self.storage_path, &backup_file).await?; if self.log_path.exists() { let backup_log = self.backup_path.join(format!("backup_{}.log", timestamp)); fs::copy(&self.log_path, &backup_log).await?; } info!("Created backup at {:?}", backup_file); Ok(backup_file) } /// Restore from a backup pub async fn restore_from_backup<P: AsRef<Path>>(&self, backup_path: P) -> Result<()> { let backup_path = backup_path.as_ref(); if !backup_path.exists() { return Err(CodeGraphError::Vector("Backup file not found".to_string())); } // Create current backup before restoring self.create_backup().await?; // Replace current storage with backup fs::copy(backup_path, &self.storage_path).await?; // Reload from restored file self.load_from_disk()?; info!("Restored from backup {:?}", backup_path); Ok(()) } /// Apply incremental updates from the log pub async fn apply_incremental_updates(&self) -> Result<()> { let log_entries = { let log = self.update_log.lock(); log.clone() }; if log_entries.is_empty() { return Ok(()); } info!("Applying {} incremental updates", log_entries.len()); for entry in log_entries { match entry.operation { UpdateOperation::Insert | UpdateOperation::Update => { if let Some(vector_data) = entry.vector_data { self.store_single_vector(entry.node_id, &vector_data) .await?; } } UpdateOperation::Delete => { self.delete_single_vector(entry.node_id).await?; } } } // Clear the log after applying updates self.update_log.lock().clear(); // Remove log file if self.log_path.exists() { fs::remove_file(&self.log_path).await?; } info!("Applied incremental updates successfully"); Ok(()) } /// Store a single vector with optional compression async fn store_single_vector(&self, node_id: NodeId, vector: &[f32]) -> Result<()> { let vector_id = { let mut next_id = self.next_vector_id.write(); let id = *next_id; *next_id += 1; id }; let norm = vector.iter().map(|x| x * x).sum::<f32>().sqrt(); let compressed_data; let compressed_size; let compression_type; // Try compression if enabled if let Some(pq) = self.pq_quantizer.read().as_ref() { if pq.trained { compressed_data = pq.encode(vector)?; compressed_size = compressed_data.len(); compression_type = CompressionType::ProductQuantization { m: pq.m, nbits: pq.nbits, }; } else { compressed_data = bincode::serialize(vector) .map_err(|e| CodeGraphError::Vector(e.to_string()))?; compressed_size = compressed_data.len(); compression_type = CompressionType::None; } } else if let Some(sq) = self.sq_quantizer.read().as_ref() { if sq.trained { compressed_data = sq.encode(vector)?; compressed_size = compressed_data.len(); compression_type = CompressionType::ScalarQuantization { nbits: sq.nbits, uniform: sq.uniform, }; } else { compressed_data = bincode::serialize(vector) .map_err(|e| CodeGraphError::Vector(e.to_string()))?; compressed_size = compressed_data.len(); compression_type = CompressionType::None; } } else { compressed_data = bincode::serialize(vector).map_err(|e| CodeGraphError::Vector(e.to_string()))?; compressed_size = compressed_data.len(); compression_type = CompressionType::None; } let metadata = VectorMetadata { node_id, vector_id, timestamp: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(), norm, compressed: compression_type != CompressionType::None, compressed_size, }; // Update mappings { let mut meta_map = self.metadata.write(); let mut vector_map = self.vector_id_mapping.write(); meta_map.insert(node_id, metadata); vector_map.insert(vector_id, node_id); } // Update header { let mut header = self.header.write(); header.vector_count = header.vector_count.max(vector_id + 1); header.compression_type = compression_type; } debug!( "Stored vector for node {} with compression ratio {:.2}", node_id, vector.len() * 4 / compressed_size.max(1) ); Ok(()) } /// Delete a single vector async fn delete_single_vector(&self, node_id: NodeId) -> Result<()> { let vector_id = { let mut meta_map = self.metadata.write(); if let Some(metadata) = meta_map.remove(&node_id) { metadata.vector_id } else { return Ok(()); // Vector not found, nothing to delete } }; { let mut vector_map = self.vector_id_mapping.write(); vector_map.remove(&vector_id); } debug!("Deleted vector for node {}", node_id); Ok(()) } /// Train quantizers on existing vectors pub async fn train_quantizers(&self, sample_vectors: &[Vec<f32>]) -> Result<()> { if sample_vectors.is_empty() { return Err(CodeGraphError::Vector( "No vectors provided for training".to_string(), )); } // Train PQ if enabled if let Some(mut pq) = self.pq_quantizer.write().as_mut() { pq.train(sample_vectors)?; info!("Trained product quantizer"); } // Train SQ if enabled if let Some(mut sq) = self.sq_quantizer.write().as_mut() { sq.train(sample_vectors)?; info!("Trained scalar quantizer"); } Ok(()) } /// Get storage statistics pub fn get_stats(&self) -> Result<StorageStats> { let header = self.header.read(); let metadata = self.metadata.read(); let total_vectors = header.vector_count; let active_vectors = metadata.len(); let storage_size = std::fs::metadata(&self.storage_path) .map(|m| m.len()) .unwrap_or(0); let compressed_vectors = metadata.values().filter(|m| m.compressed).count(); let avg_compression_ratio = if compressed_vectors > 0 { let original_size = header.dimension * 4; // f32 size let total_compressed_size: usize = metadata .values() .filter(|m| m.compressed) .map(|m| m.compressed_size) .sum(); (original_size * compressed_vectors) as f64 / total_compressed_size as f64 } else { 1.0 }; Ok(StorageStats { total_vectors, active_vectors, storage_size_bytes: storage_size, compressed_vectors, compression_ratio: avg_compression_ratio, dimension: header.dimension, last_modified: header.last_modified, incremental_enabled: header.incremental_enabled, }) } } /// Storage statistics #[derive(Debug, Clone)] pub struct StorageStats { pub total_vectors: u64, pub active_vectors: usize, pub storage_size_bytes: u64, pub compressed_vectors: usize, pub compression_ratio: f64, pub dimension: usize, pub last_modified: u64, pub incremental_enabled: bool, } #[async_trait] impl VectorStore for PersistentVectorStore { async fn store_embeddings(&mut self, nodes: &[CodeNode]) -> Result<()> { let vectors_with_embeddings: Vec<_> = nodes .iter() .filter_map(|node| node.embedding.as_ref().map(|emb| (node.id, emb.clone()))) .collect(); if vectors_with_embeddings.is_empty() { return Ok(()); } // Train quantizers if not already trained let sample_vectors: Vec<Vec<f32>> = vectors_with_embeddings .iter() .map(|(_, emb)| emb.clone()) .collect(); self.train_quantizers(&sample_vectors).await?; // Store vectors for (node_id, embedding) in vectors_with_embeddings { self.store_single_vector(node_id, &embedding).await?; // Add to incremental log let log_entry = UpdateLogEntry { operation: UpdateOperation::Insert, node_id, vector_id: Some(self.metadata.read().get(&node_id).unwrap().vector_id), timestamp: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(), vector_data: Some(embedding), }; self.update_log.lock().push(log_entry); } // Save to disk self.save_to_disk().await?; info!("Stored {} embeddings to persistent storage", nodes.len()); Ok(()) } async fn search_similar(&self, query_embedding: &[f32], limit: usize) -> Result<Vec<NodeId>> { let header = self.header.read(); if query_embedding.len() != header.dimension { return Err(CodeGraphError::Vector(format!( "Query embedding dimension {} doesn't match storage dimension {}", query_embedding.len(), header.dimension ))); } let metadata = self.metadata.read(); if metadata.is_empty() { return Ok(Vec::new()); } // For now, implement brute-force search // In production, this would use FAISS or similar indexing let mut similarities: Vec<(NodeId, f32)> = Vec::new(); for (node_id, meta) in metadata.iter() { // Reconstruct vector (simplified - in reality would decompress properly) let reconstructed_vector = if meta.compressed { // For demo purposes, assume we can reconstruct // In reality, this would use the appropriate quantizer vec![0.0; header.dimension] } else { // Load uncompressed vector (would read from disk) vec![0.0; header.dimension] }; // Calculate cosine similarity let dot_product: f32 = query_embedding .iter() .zip(reconstructed_vector.iter()) .map(|(a, b)| a * b) .sum(); let query_norm: f32 = query_embedding.iter().map(|x| x * x).sum::<f32>().sqrt(); let similarity = dot_product / (query_norm * meta.norm); similarities.push((*node_id, similarity)); } // Sort by similarity and take top results similarities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap()); similarities.truncate(limit); Ok(similarities.into_iter().map(|(id, _)| id).collect()) } async fn get_embedding(&self, node_id: NodeId) -> Result<Option<Vec<f32>>> { let metadata = self.metadata.read(); let meta = match metadata.get(&node_id) { Some(meta) => meta, None => return Ok(None), }; if meta.compressed { // Decompress the vector let header = self.header.read(); match header.compression_type { CompressionType::ProductQuantization { .. } => { if let Some(pq) = self.pq_quantizer.read().as_ref() { // In reality, would load compressed data from disk let dummy_codes = vec![0u8; pq.m]; let decompressed = pq.decode(&dummy_codes)?; Ok(Some(decompressed)) } else { Err(CodeGraphError::Vector( "PQ quantizer not available".to_string(), )) } } CompressionType::ScalarQuantization { .. } => { if let Some(sq) = self.sq_quantizer.read().as_ref() { // In reality, would load compressed data from disk let dummy_encoded = vec![0u8; header.dimension]; let decompressed = sq.decode(&dummy_encoded)?; Ok(Some(decompressed)) } else { Err(CodeGraphError::Vector( "SQ quantizer not available".to_string(), )) } } CompressionType::None => { // Load uncompressed vector from disk Ok(Some(vec![0.0; self.header.read().dimension])) } } } else { // Load uncompressed vector from disk Ok(Some(vec![0.0; self.header.read().dimension])) } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server