Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
persistent_integration_tests.rsโ€ข20.4 kB
#[cfg(feature = "persistent")] mod persistent_tests { use codegraph_core::{CodeNode, NodeId}; use codegraph_vector::{ CompressionType, ConsistencyConfig, ConsistencyManager, IncrementalConfig, IncrementalOperation, IncrementalUpdateManager, IsolationLevel, PersistentVectorStore, StorageStats, VectorOperation, }; use std::collections::HashMap; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tempfile::TempDir; use tokio_test; const TEST_DIMENSION: usize = 128; fn create_test_vectors(count: usize, dimension: usize) -> Vec<(NodeId, Vec<f32>)> { (0..count) .map(|i| { let vector: Vec<f32> = (0..dimension) .map(|j| (i * dimension + j) as f32 * 0.01) .collect(); (i as NodeId, vector) }) .collect() } fn create_test_nodes(vectors: &[(NodeId, Vec<f32>)]) -> Vec<CodeNode> { vectors .iter() .map(|(id, vector)| CodeNode { id: *id, name: format!("test_node_{}", id), node_type: codegraph_core::NodeType::Function, content: format!("Content for node {}", id), file_path: format!("test_{}.rs", id), start_line: *id as usize, end_line: *id as usize + 10, children: Vec::new(), embedding: Some(vector.clone()), metadata: HashMap::new(), }) .collect() } #[tokio::test] async fn test_persistent_storage_lifecycle() { let temp_dir = TempDir::new().unwrap(); let storage_path = temp_dir.path().join("test_vectors.db"); let backup_path = temp_dir.path().join("backups"); // Create persistent storage let mut store = PersistentVectorStore::new(&storage_path, &backup_path, TEST_DIMENSION).unwrap(); // Generate test data let test_vectors = create_test_vectors(100, TEST_DIMENSION); let test_nodes = create_test_nodes(&test_vectors); // Store embeddings store.store_embeddings(&test_nodes).await.unwrap(); // Verify storage statistics let stats = store.get_stats().unwrap(); assert_eq!(stats.active_vectors, test_nodes.len()); assert_eq!(stats.dimension, TEST_DIMENSION); // Test similarity search let query_vector = &test_vectors[0].1; let similar_nodes = store.search_similar(query_vector, 10).await.unwrap(); assert!(!similar_nodes.is_empty()); assert!(similar_nodes.contains(&test_vectors[0].0)); // Test individual vector retrieval for (node_id, expected_vector) in &test_vectors[0..5] { let retrieved = store.get_embedding(*node_id).await.unwrap(); assert!(retrieved.is_some()); // Note: In real implementation, we'd compare the vectors // Here we're just checking that retrieval works } } #[tokio::test] async fn test_vector_compression() { let temp_dir = TempDir::new().unwrap(); let storage_path = temp_dir.path().join("test_compressed.db"); let backup_path = temp_dir.path().join("backups"); let mut store = PersistentVectorStore::new(&storage_path, &backup_path, TEST_DIMENSION).unwrap(); // Enable product quantization store.enable_product_quantization(16, 8).unwrap(); let test_vectors = create_test_vectors(50, TEST_DIMENSION); let test_nodes = create_test_nodes(&test_vectors); // Store embeddings (should trigger training and compression) store.store_embeddings(&test_nodes).await.unwrap(); let stats = store.get_stats().unwrap(); assert!(stats.compression_ratio > 1.0, "Should achieve compression"); assert_eq!(stats.compressed_vectors, test_nodes.len()); } #[tokio::test] async fn test_scalar_quantization() { let temp_dir = TempDir::new().unwrap(); let storage_path = temp_dir.path().join("test_scalar.db"); let backup_path = temp_dir.path().join("backups"); let mut store = PersistentVectorStore::new(&storage_path, &backup_path, TEST_DIMENSION).unwrap(); // Enable scalar quantization store.enable_scalar_quantization(8, false).unwrap(); let test_vectors = create_test_vectors(30, TEST_DIMENSION); let test_nodes = create_test_nodes(&test_vectors); store.store_embeddings(&test_nodes).await.unwrap(); let stats = store.get_stats().unwrap(); assert!(stats.compression_ratio > 1.0); } #[tokio::test] async fn test_backup_and_recovery() { let temp_dir = TempDir::new().unwrap(); let storage_path = temp_dir.path().join("test_backup.db"); let backup_path = temp_dir.path().join("backups"); let mut store = PersistentVectorStore::new(&storage_path, &backup_path, TEST_DIMENSION).unwrap(); let test_vectors = create_test_vectors(20, TEST_DIMENSION); let test_nodes = create_test_nodes(&test_vectors); store.store_embeddings(&test_nodes).await.unwrap(); // Create backup let backup_file = store.create_backup().await.unwrap(); assert!(backup_file.exists()); // Simulate corruption by creating new storage let mut new_store = PersistentVectorStore::new( temp_dir.path().join("test_backup2.db"), &backup_path, TEST_DIMENSION, ) .unwrap(); // Restore from backup new_store.restore_from_backup(&backup_file).await.unwrap(); // Verify data integrity after restore let stats = new_store.get_stats().unwrap(); assert_eq!(stats.active_vectors, test_nodes.len()); } #[tokio::test] async fn test_incremental_updates() { let config = IncrementalConfig { max_batch_size: 10, batch_timeout: Duration::from_millis(50), worker_threads: 2, enable_parallel_processing: true, ..Default::default() }; let manager = IncrementalUpdateManager::new(config).unwrap(); // Submit insert operations let insert_ops: Vec<_> = (0..25) .map(|i| IncrementalOperation::Insert { node_id: i, vector: vec![i as f32; TEST_DIMENSION], timestamp: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(), }) .collect(); manager.submit_batch(insert_ops).unwrap(); // Wait for processing tokio::time::sleep(Duration::from_millis(200)).await; let stats = manager.get_stats(); assert!(stats.successful_operations > 0); assert!(stats.batches_processed > 0); // Test update operations let update_ops: Vec<_> = (0..5) .map(|i| IncrementalOperation::Update { node_id: i, old_vector: Some(vec![i as f32; TEST_DIMENSION]), new_vector: vec![(i + 100) as f32; TEST_DIMENSION], timestamp: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(), }) .collect(); manager.submit_batch(update_ops).unwrap(); tokio::time::sleep(Duration::from_millis(100)).await; // Test delete operations let delete_ops: Vec<_> = (20..25) .map(|i| IncrementalOperation::Delete { node_id: i, timestamp: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(), }) .collect(); manager.submit_batch(delete_ops).unwrap(); tokio::time::sleep(Duration::from_millis(100)).await; let final_stats = manager.get_stats(); assert!(final_stats.successful_operations >= stats.successful_operations); } #[tokio::test] async fn test_segment_management() { let config = IncrementalConfig { max_segment_size: 1000, // Small segments for testing max_batch_size: 5, batch_timeout: Duration::from_millis(10), ..Default::default() }; let manager = IncrementalUpdateManager::new(config).unwrap(); // Add vectors to trigger multiple segments for batch in 0..10 { let ops: Vec<_> = (0..10) .map(|i| { let node_id = batch * 10 + i; IncrementalOperation::Insert { node_id, vector: vec![1.0; 100], // Large vectors timestamp: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(), } }) .collect(); manager.submit_batch(ops).unwrap(); } // Wait for processing tokio::time::sleep(Duration::from_millis(300)).await; let segments = manager.get_segments(); assert!(segments.len() > 1, "Should create multiple segments"); // Test segment merging let initial_count = segments.len(); let merged_count = manager.merge_segments(5).await.unwrap(); if merged_count > 0 { let new_segments = manager.get_segments(); assert!( new_segments.len() <= initial_count, "Should reduce segment count" ); } } #[tokio::test] async fn test_consistency_manager() { let config = ConsistencyConfig::default(); let manager = ConsistencyManager::new(config); // Test transaction lifecycle let txn_id = manager .begin_transaction(IsolationLevel::ReadCommitted) .unwrap(); // Add operations to transaction let operations = vec![ VectorOperation::Insert { node_id: 1, vector: vec![1.0, 2.0, 3.0], }, VectorOperation::Update { node_id: 2, old_vector: Some(vec![2.0, 3.0, 4.0]), new_vector: vec![3.0, 4.0, 5.0], }, ]; for op in operations { manager.add_operation(txn_id, op).unwrap(); } // Test two-phase commit manager.prepare_transaction(txn_id).unwrap(); manager.commit_transaction(txn_id).unwrap(); let stats = manager.get_transaction_stats(); assert_eq!(stats.committed_transactions, 1); } #[tokio::test] async fn test_transaction_isolation() { let config = ConsistencyConfig::default(); let manager = ConsistencyManager::new(config); // Start two transactions let txn1 = manager .begin_transaction(IsolationLevel::Serializable) .unwrap(); let txn2 = manager .begin_transaction(IsolationLevel::Serializable) .unwrap(); // Both try to modify the same node let op1 = VectorOperation::Insert { node_id: 1, vector: vec![1.0, 2.0, 3.0], }; let op2 = VectorOperation::Update { node_id: 1, old_vector: None, new_vector: vec![4.0, 5.0, 6.0], }; manager.add_operation(txn1, op1).unwrap(); // Second transaction should detect conflict with serializable isolation let result = manager.add_operation(txn2, op2); assert!(result.is_err(), "Should detect serialization conflict"); // Clean up first transaction manager.abort_transaction(txn1).unwrap(); } #[tokio::test] async fn test_lock_management() { use codegraph_vector::LockMode; let config = ConsistencyConfig::default(); let manager = ConsistencyManager::new(config); let txn1 = manager .begin_transaction(IsolationLevel::ReadCommitted) .unwrap(); let txn2 = manager .begin_transaction(IsolationLevel::ReadCommitted) .unwrap(); // Transaction 1 acquires shared lock manager .acquire_lock(txn1, 1, LockMode::Shared) .await .unwrap(); // Transaction 2 can also acquire shared lock manager .acquire_lock(txn2, 1, LockMode::Shared) .await .unwrap(); // But transaction 2 cannot acquire exclusive lock (should timeout) let result = tokio::time::timeout( Duration::from_millis(100), manager.acquire_lock(txn2, 1, LockMode::Exclusive), ) .await; assert!(result.is_err(), "Should timeout on exclusive lock conflict"); // Clean up manager.abort_transaction(txn1).unwrap(); manager.abort_transaction(txn2).unwrap(); } #[tokio::test] async fn test_transaction_abort_and_rollback() { let config = ConsistencyConfig::default(); let manager = ConsistencyManager::new(config); let txn_id = manager .begin_transaction(IsolationLevel::ReadCommitted) .unwrap(); // Add some operations let operations = vec![ VectorOperation::Insert { node_id: 10, vector: vec![10.0, 20.0, 30.0], }, VectorOperation::Update { node_id: 11, old_vector: Some(vec![1.0, 2.0, 3.0]), new_vector: vec![11.0, 22.0, 33.0], }, VectorOperation::Delete { node_id: 12, vector: Some(vec![12.0, 24.0, 36.0]), }, ]; for op in operations { manager.add_operation(txn_id, op).unwrap(); } // Abort transaction and get rollback operations let rollback_ops = manager.abort_transaction(txn_id).unwrap(); assert_eq!(rollback_ops.len(), 3); // Verify rollback operations are in reverse order and correct type match &rollback_ops[0] { VectorOperation::Insert { node_id, .. } => { assert_eq!(*node_id, 12); // Should restore deleted item } _ => panic!("Expected insert operation for delete rollback"), } match &rollback_ops[1] { VectorOperation::Update { node_id, new_vector, .. } => { assert_eq!(*node_id, 11); assert_eq!(*new_vector, vec![1.0, 2.0, 3.0]); // Should restore old vector } _ => panic!("Expected update operation for update rollback"), } match &rollback_ops[2] { VectorOperation::Delete { node_id, .. } => { assert_eq!(*node_id, 10); // Should delete inserted item } _ => panic!("Expected delete operation for insert rollback"), } } #[tokio::test] async fn test_consistency_checkpoints() { let config = ConsistencyConfig::default(); let manager = ConsistencyManager::new(config); // Create and commit some transactions for i in 0..5 { let txn_id = manager .begin_transaction(IsolationLevel::ReadCommitted) .unwrap(); let op = VectorOperation::Insert { node_id: i, vector: vec![i as f32; 3], }; manager.add_operation(txn_id, op).unwrap(); manager.prepare_transaction(txn_id).unwrap(); manager.commit_transaction(txn_id).unwrap(); } // Create checkpoint let checkpoint = manager.create_checkpoint().unwrap(); assert!(checkpoint.checkpoint_id > 0); assert_eq!(checkpoint.committed_transactions.len(), 5); // Verify we can retrieve the checkpoint let latest_checkpoint = manager.get_latest_checkpoint().unwrap(); assert_eq!(latest_checkpoint.checkpoint_id, checkpoint.checkpoint_id); } #[tokio::test] async fn test_storage_persistence() { let temp_dir = TempDir::new().unwrap(); let storage_path = temp_dir.path().join("test_persist.db"); let backup_path = temp_dir.path().join("backups"); // Create and populate storage { let mut store = PersistentVectorStore::new(&storage_path, &backup_path, TEST_DIMENSION).unwrap(); let test_vectors = create_test_vectors(50, TEST_DIMENSION); let test_nodes = create_test_nodes(&test_vectors); store.store_embeddings(&test_nodes).await.unwrap(); let stats = store.get_stats().unwrap(); assert_eq!(stats.active_vectors, 50); } // Reload storage and verify data persistence { let store = PersistentVectorStore::new(&storage_path, &backup_path, TEST_DIMENSION).unwrap(); let stats = store.get_stats().unwrap(); assert_eq!(stats.active_vectors, 50); assert_eq!(stats.dimension, TEST_DIMENSION); // Verify we can still perform searches let query_vector = vec![0.5; TEST_DIMENSION]; let results = store.search_similar(&query_vector, 5).await.unwrap(); assert!(!results.is_empty()); } } #[tokio::test] async fn test_incremental_updates_with_wal() { let config = IncrementalConfig { enable_wal: true, wal_flush_interval: Duration::from_millis(10), max_batch_size: 5, batch_timeout: Duration::from_millis(50), ..Default::default() }; let manager = IncrementalUpdateManager::new(config).unwrap(); // Submit operations that should be logged to WAL let operations: Vec<_> = (0..20) .map(|i| IncrementalOperation::Insert { node_id: i, vector: vec![i as f32; 10], timestamp: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(), }) .collect(); for op in operations { manager.submit_operation(op).unwrap(); } // Wait for WAL flush tokio::time::sleep(Duration::from_millis(100)).await; let stats = manager.get_stats(); assert!(stats.successful_operations > 0); } #[tokio::test] async fn test_performance_under_load() { let temp_dir = TempDir::new().unwrap(); let storage_path = temp_dir.path().join("test_perf.db"); let backup_path = temp_dir.path().join("backups"); let mut store = PersistentVectorStore::new(&storage_path, &backup_path, TEST_DIMENSION).unwrap(); // Enable compression for better performance store.enable_product_quantization(16, 8).unwrap(); let start_time = SystemTime::now(); // Store a large number of vectors let batch_size = 100; let num_batches = 10; for batch in 0..num_batches { let vectors = create_test_vectors(batch_size, TEST_DIMENSION); let nodes = create_test_nodes(&vectors); store.store_embeddings(&nodes).await.unwrap(); } let storage_time = start_time.elapsed().unwrap(); println!( "Stored {} vectors in {:?}", batch_size * num_batches, storage_time ); // Test search performance let search_start = SystemTime::now(); let query_vector = vec![0.5; TEST_DIMENSION]; for _ in 0..50 { let _results = store.search_similar(&query_vector, 10).await.unwrap(); } let search_time = search_start.elapsed().unwrap(); println!("Performed 50 searches in {:?}", search_time); let stats = store.get_stats().unwrap(); assert_eq!(stats.active_vectors, batch_size * num_batches); assert!(stats.compression_ratio > 1.0); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server