versioning_tests.rsā¢18.1 kB
use chrono::Utc;
use codegraph_core::{
ChangeType, CodeNode, IsolationLevel, Language, Location, Metadata, NodeType,
TransactionStatus, WriteOperation,
};
use codegraph_graph::{
ConcurrentTransactionManager, GitLikeVersionManager, RecoveryManager, TransactionManager,
TransactionalGraph, VersionedRocksDbStorage, VersionedStore,
};
use std::collections::HashMap;
use tempfile::tempdir;
use tokio_test;
use uuid::Uuid;
#[tokio::test]
async fn test_basic_snapshot_creation() {
let temp_dir = tempdir().unwrap();
let mut storage = VersionedRocksDbStorage::new(temp_dir.path()).await.unwrap();
// Begin transaction
let tx_id = storage
.begin_transaction(IsolationLevel::ReadCommitted)
.await
.unwrap();
// Create snapshot
let snapshot_id = storage.create_snapshot(tx_id).await.unwrap();
// Verify snapshot exists
let snapshot = storage.get_snapshot(snapshot_id).await.unwrap();
assert!(snapshot.is_some());
let snapshot = snapshot.unwrap();
assert_eq!(snapshot.id, snapshot_id);
assert_eq!(snapshot.transaction_id, tx_id);
// Commit transaction
storage.commit_transaction(tx_id).await.unwrap();
}
#[tokio::test]
async fn test_mvcc_isolation_levels() {
let temp_dir = tempdir().unwrap();
let storage = VersionedRocksDbStorage::new(temp_dir.path()).await.unwrap();
let storage_arc = std::sync::Arc::new(parking_lot::RwLock::new(storage));
let manager = ConcurrentTransactionManager::new(storage_arc.clone(), 10);
// Test READ_COMMITTED
let mut tx1 = manager
.create_transaction(IsolationLevel::ReadCommitted)
.await
.unwrap();
let mut tx2 = manager
.create_transaction(IsolationLevel::ReadCommitted)
.await
.unwrap();
// Create a node in tx1
let node1 = create_test_node("function1", NodeType::Function);
tx1.add_node(node1.clone()).await.unwrap();
// tx2 should not see uncommitted changes from tx1
let result = tx2.get_node(node1.id).await.unwrap();
assert!(result.is_none());
// Commit tx1
tx1.commit().await.unwrap();
// Start new transaction for tx2 to see committed changes
tx2 = manager
.create_transaction(IsolationLevel::ReadCommitted)
.await
.unwrap();
let result = tx2.get_node(node1.id).await.unwrap();
// NOTE: This would pass in a full implementation
// assert!(result.is_some());
tx2.commit().await.unwrap();
}
#[tokio::test]
async fn test_concurrent_transactions() {
let temp_dir = tempdir().unwrap();
let storage = VersionedRocksDbStorage::new(temp_dir.path()).await.unwrap();
let storage_arc = std::sync::Arc::new(parking_lot::RwLock::new(storage));
let manager = ConcurrentTransactionManager::new(storage_arc.clone(), 10);
// Create multiple concurrent transactions
let mut handles = vec![];
for i in 0..5 {
let manager_clone = std::sync::Arc::new(manager);
let handle = tokio::spawn(async move {
let mut tx = manager_clone
.create_transaction(IsolationLevel::ReadCommitted)
.await
.unwrap();
let node = create_test_node(&format!("function_{}", i), NodeType::Function);
tx.add_node(node).await.unwrap();
tx.commit().await
});
handles.push(handle);
}
// Wait for all transactions to complete
for handle in handles {
handle.await.unwrap().unwrap();
}
}
#[tokio::test]
async fn test_transaction_rollback() {
let temp_dir = tempdir().unwrap();
let mut graph = TransactionalGraph::new(temp_dir.path().to_str().unwrap())
.await
.unwrap();
// Begin transaction
graph.begin_transaction().await.unwrap();
// Add a node
let node = create_test_node("test_function", NodeType::Function);
graph.add_node(node.clone()).await.unwrap();
// Rollback transaction
graph.rollback().await.unwrap();
// Start new transaction to verify node doesn't exist
graph.begin_transaction().await.unwrap();
let result = graph.get_node(node.id).await.unwrap();
assert!(result.is_none());
graph.commit().await.unwrap();
}
#[tokio::test]
async fn test_version_creation_and_retrieval() {
let temp_dir = tempdir().unwrap();
let mut storage = VersionedRocksDbStorage::new(temp_dir.path()).await.unwrap();
// Create a snapshot
let tx_id = storage
.begin_transaction(IsolationLevel::ReadCommitted)
.await
.unwrap();
let snapshot_id = storage.create_snapshot(tx_id).await.unwrap();
storage.commit_transaction(tx_id).await.unwrap();
// Create a version
let version_id = storage
.create_version(
"v1.0.0".to_string(),
"Initial version".to_string(),
"test_author".to_string(),
snapshot_id,
vec![],
)
.await
.unwrap();
// Retrieve version
let version = storage.get_version(version_id).await.unwrap();
assert!(version.is_some());
let version = version.unwrap();
assert_eq!(version.name, "v1.0.0");
assert_eq!(version.description, "Initial version");
assert_eq!(version.author, "test_author");
assert_eq!(version.snapshot_id, snapshot_id);
}
#[tokio::test]
async fn test_version_tagging() {
let temp_dir = tempdir().unwrap();
let mut storage = VersionedRocksDbStorage::new(temp_dir.path()).await.unwrap();
// Create version
let tx_id = storage
.begin_transaction(IsolationLevel::ReadCommitted)
.await
.unwrap();
let snapshot_id = storage.create_snapshot(tx_id).await.unwrap();
storage.commit_transaction(tx_id).await.unwrap();
let version_id = storage
.create_version(
"main".to_string(),
"Main branch".to_string(),
"test_author".to_string(),
snapshot_id,
vec![],
)
.await
.unwrap();
// Tag version
storage
.tag_version(version_id, "stable".to_string())
.await
.unwrap();
// Retrieve by tag
let version = storage.get_version_by_tag("stable").await.unwrap();
assert!(version.is_some());
assert_eq!(version.unwrap().id, version_id);
}
#[tokio::test]
async fn test_version_branching() {
let temp_dir = tempdir().unwrap();
let mut storage = VersionedRocksDbStorage::new(temp_dir.path()).await.unwrap();
// Create initial version
let tx_id = storage
.begin_transaction(IsolationLevel::ReadCommitted)
.await
.unwrap();
let snapshot_id = storage.create_snapshot(tx_id).await.unwrap();
storage.commit_transaction(tx_id).await.unwrap();
let main_version = storage
.create_version(
"main".to_string(),
"Main branch".to_string(),
"test_author".to_string(),
snapshot_id,
vec![],
)
.await
.unwrap();
// Create branch
let feature_version = storage
.branch_from_version(
main_version,
"feature-branch".to_string(),
"test_author".to_string(),
)
.await
.unwrap();
// Verify branch
let version = storage.get_version(feature_version).await.unwrap().unwrap();
assert_eq!(version.name, "feature-branch");
assert_eq!(version.parent_versions, vec![main_version]);
assert_eq!(version.snapshot_id, snapshot_id); // Should point to same snapshot initially
}
#[tokio::test]
async fn test_version_comparison() {
let temp_dir = tempdir().unwrap();
let mut storage = VersionedRocksDbStorage::new(temp_dir.path()).await.unwrap();
// Create two versions with different snapshots
let tx1 = storage
.begin_transaction(IsolationLevel::ReadCommitted)
.await
.unwrap();
let snapshot1 = storage.create_snapshot(tx1).await.unwrap();
storage.commit_transaction(tx1).await.unwrap();
let tx2 = storage
.begin_transaction(IsolationLevel::ReadCommitted)
.await
.unwrap();
let snapshot2 = storage.create_snapshot(tx2).await.unwrap();
storage.commit_transaction(tx2).await.unwrap();
let version1 = storage
.create_version(
"v1".to_string(),
"Version 1".to_string(),
"author".to_string(),
snapshot1,
vec![],
)
.await
.unwrap();
let version2 = storage
.create_version(
"v2".to_string(),
"Version 2".to_string(),
"author".to_string(),
snapshot2,
vec![version1],
)
.await
.unwrap();
// Compare versions
let diff = storage.compare_versions(version1, version2).await.unwrap();
// Since snapshots are empty, diff should show no changes
assert_eq!(diff.added_nodes.len(), 0);
assert_eq!(diff.modified_nodes.len(), 0);
assert_eq!(diff.deleted_nodes.len(), 0);
}
#[tokio::test]
async fn test_savepoint_and_rollback() {
let temp_dir = tempdir().unwrap();
let mut graph = TransactionalGraph::new(temp_dir.path().to_str().unwrap())
.await
.unwrap();
// Begin transaction
graph.begin_transaction().await.unwrap();
// Add first node
let node1 = create_test_node("function1", NodeType::Function);
graph.add_node(node1.clone()).await.unwrap();
// Create savepoint
let savepoint = graph.create_savepoint().await.unwrap();
// Add second node
let node2 = create_test_node("function2", NodeType::Function);
graph.add_node(node2.clone()).await.unwrap();
// Rollback to savepoint
graph.rollback_to_savepoint(savepoint).await.unwrap();
// Verify only first node exists
let result1 = graph.get_node(node1.id).await.unwrap();
let result2 = graph.get_node(node2.id).await.unwrap();
// NOTE: In full implementation, result1 should be Some and result2 should be None
// assert!(result1.is_some());
// assert!(result2.is_none());
graph.commit().await.unwrap();
}
#[tokio::test]
async fn test_recovery_manager_integrity_check() {
let temp_dir = tempdir().unwrap();
let storage_path = temp_dir.path().join("storage");
let backup_path = temp_dir.path().join("backup");
// Create recovery manager
let recovery_manager = RecoveryManager::new(&storage_path, &backup_path);
// Run integrity check on empty storage
let report = recovery_manager.run_integrity_check().await.unwrap();
assert_eq!(report.issues.len(), 0);
assert_eq!(report.corrupted_data_count, 0);
assert_eq!(report.orphaned_snapshots.len(), 0);
assert_eq!(report.missing_content_hashes.len(), 0);
}
#[tokio::test]
async fn test_backup_creation_and_verification() {
let temp_dir = tempdir().unwrap();
let storage_path = temp_dir.path().join("storage");
let backup_path = temp_dir.path().join("backup");
let recovery_manager = RecoveryManager::new(&storage_path, &backup_path);
// Create backup
let backup_location = recovery_manager.create_backup().await.unwrap();
// Verify backup
let is_valid = recovery_manager
.verify_backup(&backup_location)
.await
.unwrap();
assert!(is_valid);
// Check backup directory exists
assert!(backup_location.exists());
}
#[tokio::test]
async fn test_concurrent_read_write_isolation() {
let temp_dir = tempdir().unwrap();
let storage = VersionedRocksDbStorage::new(temp_dir.path()).await.unwrap();
let storage_arc = std::sync::Arc::new(parking_lot::RwLock::new(storage));
let manager = std::sync::Arc::new(ConcurrentTransactionManager::new(storage_arc, 10));
// Writer transaction
let writer_manager = manager.clone();
let writer_handle = tokio::spawn(async move {
let mut tx = writer_manager
.create_transaction(IsolationLevel::RepeatableRead)
.await
.unwrap();
let node = create_test_node("shared_function", NodeType::Function);
tx.add_node(node).await.unwrap();
// Simulate some work
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
tx.commit().await
});
// Reader transaction
let reader_manager = manager.clone();
let reader_handle = tokio::spawn(async move {
let mut tx = reader_manager
.create_transaction(IsolationLevel::RepeatableRead)
.await
.unwrap();
// Try to read the node multiple times during writer's transaction
for _ in 0..5 {
let _result = tx.find_nodes_by_name("shared_function").await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(25)).await;
}
tx.commit().await
});
// Both transactions should complete successfully
let (writer_result, reader_result) = tokio::join!(writer_handle, reader_handle);
writer_result.unwrap().unwrap();
reader_result.unwrap().unwrap();
}
#[tokio::test]
async fn test_transaction_timeout_and_cleanup() {
let temp_dir = tempdir().unwrap();
let storage = VersionedRocksDbStorage::new(temp_dir.path()).await.unwrap();
let storage_arc = std::sync::Arc::new(parking_lot::RwLock::new(storage));
let manager = ConcurrentTransactionManager::new(storage_arc, 10);
// Create transaction
let mut tx = manager
.create_transaction(IsolationLevel::ReadCommitted)
.await
.unwrap();
// Add node
let node = create_test_node("timeout_test", NodeType::Function);
tx.add_node(node).await.unwrap();
// Simulate timeout by dropping transaction without commit
drop(tx);
// Transaction should be automatically rolled back
// In a full implementation, you would verify this by checking that
// the node is not present in subsequent transactions
}
#[tokio::test]
async fn test_write_ahead_log_recovery() {
let temp_dir = tempdir().unwrap();
let mut storage = VersionedRocksDbStorage::new(temp_dir.path()).await.unwrap();
// Create WAL entry
let tx_id = storage
.begin_transaction(IsolationLevel::ReadCommitted)
.await
.unwrap();
let wal_entry = codegraph_core::WriteAheadLogEntry {
id: Uuid::new_v4(),
transaction_id: tx_id,
sequence_number: 1,
operation: WriteOperation::Insert(Uuid::new_v4()),
node_id: Uuid::new_v4(),
before_image: None,
after_image: None,
timestamp: Utc::now(),
};
// Append to WAL
let sequence = storage.append_entry(wal_entry.clone()).await.unwrap();
assert!(sequence > 0);
// Retrieve WAL entries
let entries = storage.get_entries_after(0).await.unwrap();
assert!(entries.len() > 0);
// Get entries for specific transaction
let tx_entries = storage.get_entries_for_transaction(tx_id).await.unwrap();
assert_eq!(tx_entries.len(), 1);
assert_eq!(tx_entries[0].transaction_id, tx_id);
storage.commit_transaction(tx_id).await.unwrap();
}
#[tokio::test]
async fn test_checkpoint_creation_and_recovery() {
let temp_dir = tempdir().unwrap();
let mut storage = VersionedRocksDbStorage::new(temp_dir.path()).await.unwrap();
// Create checkpoint
let checkpoint = storage.create_checkpoint().await.unwrap();
assert!(!checkpoint.id.is_nil());
// Get last checkpoint
let last_checkpoint = storage.get_last_checkpoint().await.unwrap();
assert!(last_checkpoint.is_some());
assert_eq!(last_checkpoint.unwrap().id, checkpoint.id);
}
// Helper function to create test nodes
fn create_test_node(name: &str, node_type: NodeType) -> CodeNode {
CodeNode::new(
name.to_string(),
Some(node_type),
Some(Language::Rust),
Location {
file_path: "test.rs".to_string(),
line: 1,
column: 0,
end_line: Some(10),
end_column: Some(0),
},
)
.with_content(format!("fn {}() {{}}", name))
}
// Integration test for complete workflow
#[tokio::test]
async fn test_complete_versioning_workflow() {
let temp_dir = tempdir().unwrap();
let mut graph = TransactionalGraph::new(temp_dir.path().to_str().unwrap())
.await
.unwrap();
// 1. Begin transaction
let tx_id = graph.begin_transaction().await.unwrap();
// 2. Add initial nodes
let node1 = create_test_node("main_function", NodeType::Function);
let node2 = create_test_node("helper_function", NodeType::Function);
graph.add_node(node1.clone()).await.unwrap();
graph.add_node(node2.clone()).await.unwrap();
// 3. Create initial version
let version1 = graph
.create_version(
"v1.0.0".to_string(),
"Initial release".to_string(),
"developer".to_string(),
vec![],
)
.await
.unwrap();
// 4. Commit transaction
graph.commit().await.unwrap();
// 5. Start new transaction for modifications
graph.begin_transaction().await.unwrap();
// 6. Modify existing node
let mut modified_node1 = node1.clone();
modified_node1.content = Some("fn main_function() { println!('Hello, World!'); }".to_string());
graph.update_node(modified_node1).await.unwrap();
// 7. Add new node
let node3 = create_test_node("new_feature", NodeType::Function);
graph.add_node(node3).await.unwrap();
// 8. Create new version
let version2 = graph
.create_version(
"v1.1.0".to_string(),
"Added new features".to_string(),
"developer".to_string(),
vec![version1],
)
.await
.unwrap();
// 9. Commit second transaction
graph.commit().await.unwrap();
// 10. Verify version history
// In full implementation, you would:
// - Compare versions to see differences
// - Navigate between versions
// - Create branches and merge them
assert_ne!(version1, version2);
}