Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
consistency.rs31.5 kB
use async_trait::async_trait; use codegraph_core::{CodeGraphError, NodeId, Result}; use parking_lot::{Mutex, RwLock}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::watch; use tracing::{debug, error, info, warn}; /// Transaction isolation levels for vector operations #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] pub enum IsolationLevel { /// Read uncommitted - fastest but least consistent ReadUncommitted, /// Read committed - prevents dirty reads ReadCommitted, /// Repeatable read - prevents dirty and non-repeatable reads RepeatableRead, /// Serializable - strongest consistency guarantees Serializable, } /// Transaction state #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum TransactionState { Active, Preparing, Prepared, Committed, Aborted, Failed, } /// Vector operation within a transaction #[derive(Debug, Clone, Serialize, Deserialize)] pub enum VectorOperation { Insert { node_id: NodeId, vector: Vec<f32>, }, Update { node_id: NodeId, old_vector: Option<Vec<f32>>, new_vector: Vec<f32>, }, Delete { node_id: NodeId, vector: Option<Vec<f32>>, // For rollback }, } impl VectorOperation { pub fn node_id(&self) -> NodeId { match self { Self::Insert { node_id, .. } | Self::Update { node_id, .. } | Self::Delete { node_id, .. } => *node_id, } } /// Create the inverse operation for rollback pub fn inverse(&self) -> Option<Self> { match self { Self::Insert { node_id, .. } => Some(Self::Delete { node_id: *node_id, vector: None, }), Self::Update { node_id, old_vector, .. } => old_vector.as_ref().map(|old| Self::Update { node_id: *node_id, old_vector: None, new_vector: old.clone(), }), Self::Delete { node_id, vector } => vector.as_ref().map(|vec| Self::Insert { node_id: *node_id, vector: vec.clone(), }), } } } /// Transaction metadata #[derive(Debug, Clone)] pub struct Transaction { pub id: u64, pub isolation_level: IsolationLevel, pub state: TransactionState, pub operations: Vec<VectorOperation>, pub read_set: HashSet<NodeId>, pub write_set: HashSet<NodeId>, pub start_time: SystemTime, pub commit_time: Option<SystemTime>, pub timeout: Option<Duration>, } impl Transaction { pub fn new(id: u64, isolation_level: IsolationLevel) -> Self { Self { id, isolation_level, state: TransactionState::Active, operations: Vec::new(), read_set: HashSet::new(), write_set: HashSet::new(), start_time: SystemTime::now(), commit_time: None, timeout: Some(Duration::from_secs(30)), // Default 30s timeout } } pub fn add_operation(&mut self, operation: VectorOperation) { let node_id = operation.node_id(); match &operation { VectorOperation::Insert { .. } | VectorOperation::Update { .. } => { self.write_set.insert(node_id); } VectorOperation::Delete { .. } => { self.write_set.insert(node_id); } } self.operations.push(operation); } pub fn add_read(&mut self, node_id: NodeId) { self.read_set.insert(node_id); } pub fn is_expired(&self) -> bool { if let Some(timeout) = self.timeout { self.start_time.elapsed().unwrap_or_default() > timeout } else { false } } pub fn conflicts_with(&self, other: &Transaction) -> bool { // Check for write-write conflicts if !self.write_set.is_disjoint(&other.write_set) { return true; } // Check for read-write conflicts based on isolation level match self.isolation_level.max(other.isolation_level) { IsolationLevel::ReadUncommitted => false, IsolationLevel::ReadCommitted => false, IsolationLevel::RepeatableRead => { !self.read_set.is_disjoint(&other.write_set) || !other.read_set.is_disjoint(&self.write_set) } IsolationLevel::Serializable => { !self.read_set.is_disjoint(&other.write_set) || !other.read_set.is_disjoint(&self.write_set) } } } } /// Lock modes for fine-grained concurrency control #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum LockMode { Shared, Exclusive, IntentionShared, IntentionExclusive, SharedIntentionExclusive, } impl LockMode { pub fn is_compatible(self, other: LockMode) -> bool { use LockMode::*; match (self, other) { (Shared, Shared) | (Shared, IntentionShared) | (IntentionShared, Shared) | (IntentionShared, IntentionShared) | (IntentionShared, IntentionExclusive) | (IntentionExclusive, IntentionShared) => true, _ => false, } } pub fn is_stronger_than(self, other: LockMode) -> bool { use LockMode::*; match (self, other) { (Exclusive, _) => true, (SharedIntentionExclusive, Shared) | (SharedIntentionExclusive, IntentionShared) => { true } (IntentionExclusive, IntentionShared) => true, _ => false, } } } /// Lock information #[derive(Debug, Clone)] pub struct Lock { pub transaction_id: u64, pub mode: LockMode, pub acquired_at: SystemTime, } /// Consistency checkpoint for recovery #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ConsistencyCheckpoint { pub checkpoint_id: u64, pub timestamp: u64, pub committed_transactions: Vec<u64>, pub vector_checksums: HashMap<NodeId, u64>, pub metadata_checksum: u64, } /// Consistency manager for vector storage #[derive(Clone)] pub struct ConsistencyManager { /// Active transactions active_transactions: Arc<RwLock<HashMap<u64, Transaction>>>, /// Lock table for fine-grained concurrency control lock_table: Arc<RwLock<HashMap<NodeId, Vec<Lock>>>>, /// Transaction log for recovery transaction_log: Arc<Mutex<Vec<TransactionLogEntry>>>, /// Next transaction ID next_transaction_id: Arc<RwLock<u64>>, /// Committed transaction IDs for visibility committed_transactions: Arc<RwLock<HashSet<u64>>>, /// Checkpoints for recovery checkpoints: Arc<RwLock<Vec<ConsistencyCheckpoint>>>, /// Configuration config: ConsistencyConfig, /// Notification channel for transaction commits commit_notifier: Arc<watch::Sender<u64>>, _commit_receiver: watch::Receiver<u64>, } #[derive(Debug, Clone)] pub struct ConsistencyConfig { /// Maximum number of active transactions pub max_active_transactions: usize, /// Transaction timeout pub transaction_timeout: Duration, /// Checkpoint interval pub checkpoint_interval: Duration, /// Maximum log entries before forced checkpoint pub max_log_entries: usize, /// Lock timeout pub lock_timeout: Duration, /// Enable deadlock detection pub enable_deadlock_detection: bool, /// Deadlock detection interval pub deadlock_detection_interval: Duration, } impl Default for ConsistencyConfig { fn default() -> Self { Self { max_active_transactions: 1000, transaction_timeout: Duration::from_secs(30), checkpoint_interval: Duration::from_secs(300), // 5 minutes max_log_entries: 10000, lock_timeout: Duration::from_secs(10), enable_deadlock_detection: true, deadlock_detection_interval: Duration::from_secs(5), } } } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TransactionLogEntry { pub transaction_id: u64, pub operation: VectorOperation, pub timestamp: u64, pub state: TransactionState, } impl ConsistencyManager { pub fn new(config: ConsistencyConfig) -> Self { let (commit_sender, commit_receiver) = watch::channel(0); let manager = Self { active_transactions: Arc::new(RwLock::new(HashMap::new())), lock_table: Arc::new(RwLock::new(HashMap::new())), transaction_log: Arc::new(Mutex::new(Vec::new())), next_transaction_id: Arc::new(RwLock::new(1)), committed_transactions: Arc::new(RwLock::new(HashSet::new())), checkpoints: Arc::new(RwLock::new(Vec::new())), config: config.clone(), commit_notifier: Arc::new(commit_sender), _commit_receiver: commit_receiver, }; // Start background tasks if config.enable_deadlock_detection { manager.start_deadlock_detector(); } manager.start_checkpoint_manager(); manager.start_cleanup_task(); manager } /// Begin a new transaction pub fn begin_transaction(&self, isolation_level: IsolationLevel) -> Result<u64> { let transaction_id = { let mut next_id = self.next_transaction_id.write(); let id = *next_id; *next_id += 1; id }; let mut active_txns = self.active_transactions.write(); // Check transaction limit if active_txns.len() >= self.config.max_active_transactions { return Err(CodeGraphError::Vector( "Maximum number of active transactions reached".to_string(), )); } let transaction = Transaction::new(transaction_id, isolation_level); active_txns.insert(transaction_id, transaction); debug!( "Started transaction {} with isolation level {:?}", transaction_id, isolation_level ); Ok(transaction_id) } /// Add an operation to a transaction pub fn add_operation(&self, transaction_id: u64, operation: VectorOperation) -> Result<()> { let mut active_txns = self.active_transactions.write(); let transaction = active_txns .get_mut(&transaction_id) .ok_or_else(|| CodeGraphError::Vector("Transaction not found".to_string()))?; if transaction.state != TransactionState::Active { return Err(CodeGraphError::Vector( "Transaction is not active".to_string(), )); } if transaction.is_expired() { transaction.state = TransactionState::Failed; return Err(CodeGraphError::Vector( "Transaction has expired".to_string(), )); } // Check for conflicts with other transactions let transaction_clone = transaction.clone(); drop(transaction); // Release mutable borrow let conflicts = active_txns .values() .filter(|other| other.id != transaction_id && other.conflicts_with(&transaction_clone)) .count(); let transaction = active_txns .get_mut(&transaction_id) .ok_or_else(|| CodeGraphError::Vector("Transaction not found".to_string()))?; if conflicts > 0 { match transaction.isolation_level { IsolationLevel::Serializable => { return Err(CodeGraphError::Vector( "Serialization conflict detected".to_string(), )); } _ => { // For lower isolation levels, we might wait or proceed warn!( "Conflict detected in transaction {} but proceeding due to isolation level", transaction_id ); } } } transaction.add_operation(operation.clone()); // Log the operation let log_entry = TransactionLogEntry { transaction_id, operation, timestamp: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(), state: TransactionState::Active, }; self.transaction_log.lock().push(log_entry); Ok(()) } /// Acquire a lock on a node pub async fn acquire_lock( &self, transaction_id: u64, node_id: NodeId, mode: LockMode, ) -> Result<()> { let timeout = tokio::time::sleep(self.config.lock_timeout); tokio::pin!(timeout); loop { tokio::select! { _ = &mut timeout => { return Err(CodeGraphError::Vector("Lock acquisition timeout".to_string())); } result = async { self.try_acquire_lock(transaction_id, node_id, mode) } => { match result { Ok(()) => return Ok(()), Err(_) => { // Wait a bit before retrying tokio::time::sleep(Duration::from_millis(10)).await; } } } } } } fn try_acquire_lock(&self, transaction_id: u64, node_id: NodeId, mode: LockMode) -> Result<()> { let mut lock_table = self.lock_table.write(); let locks = lock_table.entry(node_id).or_insert_with(Vec::new); // Check if any existing locks are incompatible for existing_lock in locks.iter() { if existing_lock.transaction_id != transaction_id && !mode.is_compatible(existing_lock.mode) { return Err(CodeGraphError::Vector("Lock conflict".to_string())); } } // Check if we already have a compatible or stronger lock if let Some(existing) = locks .iter_mut() .find(|l| l.transaction_id == transaction_id) { if mode.is_stronger_than(existing.mode) { existing.mode = mode; existing.acquired_at = SystemTime::now(); } } else { // Acquire new lock locks.push(Lock { transaction_id, mode, acquired_at: SystemTime::now(), }); } debug!( "Acquired {:?} lock on node {} for transaction {}", mode, node_id, transaction_id ); Ok(()) } /// Release all locks held by a transaction pub fn release_locks(&self, transaction_id: u64) { let mut lock_table = self.lock_table.write(); for locks in lock_table.values_mut() { locks.retain(|lock| lock.transaction_id != transaction_id); } // Remove empty entries lock_table.retain(|_, locks| !locks.is_empty()); debug!("Released all locks for transaction {}", transaction_id); } /// Prepare a transaction for two-phase commit pub fn prepare_transaction(&self, transaction_id: u64) -> Result<()> { let mut active_txns = self.active_transactions.write(); let transaction = active_txns .get_mut(&transaction_id) .ok_or_else(|| CodeGraphError::Vector("Transaction not found".to_string()))?; if transaction.state != TransactionState::Active { return Err(CodeGraphError::Vector( "Transaction is not active".to_string(), )); } if transaction.is_expired() { transaction.state = TransactionState::Failed; return Err(CodeGraphError::Vector( "Transaction has expired".to_string(), )); } transaction.state = TransactionState::Preparing; // Validate transaction can be committed let validation_result = self.validate_transaction(transaction); if validation_result.is_ok() { transaction.state = TransactionState::Prepared; info!("Prepared transaction {} for commit", transaction_id); } else { transaction.state = TransactionState::Failed; return validation_result; } Ok(()) } /// Commit a prepared transaction pub fn commit_transaction(&self, transaction_id: u64) -> Result<()> { let mut active_txns = self.active_transactions.write(); let mut transaction = active_txns .remove(&transaction_id) .ok_or_else(|| CodeGraphError::Vector("Transaction not found".to_string()))?; if transaction.state != TransactionState::Prepared { return Err(CodeGraphError::Vector( "Transaction is not prepared".to_string(), )); } transaction.state = TransactionState::Committed; transaction.commit_time = Some(SystemTime::now()); // Add to committed transactions { let mut committed = self.committed_transactions.write(); committed.insert(transaction_id); } // Log commit let log_entry = TransactionLogEntry { transaction_id, operation: VectorOperation::Insert { node_id: NodeId::nil(), vector: vec![], }, // Dummy operation timestamp: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(), state: TransactionState::Committed, }; self.transaction_log.lock().push(log_entry); // Release locks self.release_locks(transaction_id); // Notify other transactions let _ = self.commit_notifier.send(transaction_id); info!("Committed transaction {}", transaction_id); Ok(()) } /// Abort a transaction pub fn abort_transaction(&self, transaction_id: u64) -> Result<Vec<VectorOperation>> { let mut active_txns = self.active_transactions.write(); let mut transaction = active_txns .remove(&transaction_id) .ok_or_else(|| CodeGraphError::Vector("Transaction not found".to_string()))?; transaction.state = TransactionState::Aborted; // Generate rollback operations let rollback_operations: Vec<VectorOperation> = transaction .operations .iter() .rev() // Reverse order for rollback .filter_map(|op| op.inverse()) .collect(); // Log abort let log_entry = TransactionLogEntry { transaction_id, operation: VectorOperation::Delete { node_id: NodeId::nil(), vector: None, }, // Dummy operation timestamp: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(), state: TransactionState::Aborted, }; self.transaction_log.lock().push(log_entry); // Release locks self.release_locks(transaction_id); warn!("Aborted transaction {}", transaction_id); Ok(rollback_operations) } /// Validate a transaction for commit fn validate_transaction(&self, transaction: &Transaction) -> Result<()> { // Check for expired transaction if transaction.is_expired() { return Err(CodeGraphError::Vector( "Transaction has expired".to_string(), )); } // Check for conflicts based on isolation level match transaction.isolation_level { IsolationLevel::Serializable => { // Strict validation - check all read/write sets let active_txns = self.active_transactions.read(); let conflicts = active_txns .values() .filter(|other| other.id != transaction.id && transaction.conflicts_with(other)) .count(); if conflicts > 0 { return Err(CodeGraphError::Vector( "Serialization conflict detected".to_string(), )); } } _ => { // Less strict validation for lower isolation levels } } Ok(()) } /// Check if a node is visible to a transaction pub fn is_visible( &self, node_id: NodeId, transaction_id: u64, committed_by: Option<u64>, ) -> bool { let active_txns = self.active_transactions.read(); let transaction = match active_txns.get(&transaction_id) { Some(txn) => txn, None => return true, // Transaction not found, assume visible }; match transaction.isolation_level { IsolationLevel::ReadUncommitted => true, // See all changes IsolationLevel::ReadCommitted => { // Only see committed changes committed_by.map_or(true, |commit_txn| { let committed = self.committed_transactions.read(); committed.contains(&commit_txn) }) } IsolationLevel::RepeatableRead | IsolationLevel::Serializable => { // Only see changes committed before this transaction started committed_by.map_or(true, |commit_txn| { let committed = self.committed_transactions.read(); committed.contains(&commit_txn) && commit_txn < transaction_id }) } } } /// Create a consistency checkpoint pub fn create_checkpoint(&self) -> Result<ConsistencyCheckpoint> { let checkpoint_id = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); let committed_txns: Vec<u64> = { let committed = self.committed_transactions.read(); committed.iter().cloned().collect() }; // For now, create empty checksums - in production this would // calculate actual checksums of all vectors let vector_checksums = HashMap::new(); let metadata_checksum = 0; let checkpoint = ConsistencyCheckpoint { checkpoint_id, timestamp: checkpoint_id, committed_transactions: committed_txns, vector_checksums, metadata_checksum, }; { let mut checkpoints = self.checkpoints.write(); checkpoints.push(checkpoint.clone()); // Keep only recent checkpoints checkpoints.sort_by_key(|cp| cp.timestamp); let checkpoint_count = checkpoints.len(); if checkpoint_count > 10 { checkpoints.drain(0..checkpoint_count - 10); } } info!("Created consistency checkpoint {}", checkpoint_id); Ok(checkpoint) } /// Get the latest checkpoint pub fn get_latest_checkpoint(&self) -> Option<ConsistencyCheckpoint> { let checkpoints = self.checkpoints.read(); checkpoints.last().cloned() } /// Start deadlock detection task fn start_deadlock_detector(&self) { let active_transactions = Arc::clone(&self.active_transactions); let lock_table = Arc::clone(&self.lock_table); let interval = self.config.deadlock_detection_interval; tokio::spawn(async move { let mut interval_timer = tokio::time::interval(interval); loop { interval_timer.tick().await; // Simple deadlock detection using wait-for graph let deadlocks = Self::detect_deadlocks(&active_transactions, &lock_table); if !deadlocks.is_empty() { warn!("Detected {} deadlocks", deadlocks.len()); // In a real implementation, we would resolve deadlocks // by aborting one of the transactions in each cycle } } }); } /// Detect deadlocks using wait-for graph fn detect_deadlocks( active_transactions: &Arc<RwLock<HashMap<u64, Transaction>>>, lock_table: &Arc<RwLock<HashMap<NodeId, Vec<Lock>>>>, ) -> Vec<Vec<u64>> { let _active_txns = active_transactions.read(); let _locks = lock_table.read(); // Simplified deadlock detection - in production this would // build a proper wait-for graph and detect cycles Vec::new() } /// Start checkpoint manager task fn start_checkpoint_manager(&self) { let manager = self.clone(); let interval = manager.config.checkpoint_interval; let max_log_entries = manager.config.max_log_entries; tokio::spawn(async move { let mut interval_timer = tokio::time::interval(interval); loop { interval_timer.tick().await; let should_checkpoint = { let log = manager.transaction_log.lock(); log.len() >= max_log_entries }; if should_checkpoint { if let Err(e) = manager.create_checkpoint() { error!("Failed to create checkpoint: {}", e); } else { // Clear old log entries after successful checkpoint let mut log = manager.transaction_log.lock(); log.clear(); } } } }); } /// Start cleanup task for expired transactions fn start_cleanup_task(&self) { let active_transactions = Arc::clone(&self.active_transactions); let cleanup_interval = Duration::from_secs(60); // Clean up every minute tokio::spawn(async move { let mut interval_timer = tokio::time::interval(cleanup_interval); loop { interval_timer.tick().await; let expired_txns: Vec<u64> = { let active_txns = active_transactions.read(); active_txns .values() .filter(|txn| txn.is_expired()) .map(|txn| txn.id) .collect() }; if !expired_txns.is_empty() { warn!("Cleaning up {} expired transactions", expired_txns.len()); let mut active_txns = active_transactions.write(); for txn_id in expired_txns { if let Some(mut txn) = active_txns.remove(&txn_id) { txn.state = TransactionState::Failed; } } } } }); } /// Get statistics about active transactions pub fn get_transaction_stats(&self) -> TransactionStats { let active_txns = self.active_transactions.read(); let committed_txns = self.committed_transactions.read(); let log = self.transaction_log.lock(); let active_count = active_txns.len(); let committed_count = committed_txns.len(); let log_entries = log.len(); let isolation_level_counts = { let mut counts = HashMap::new(); for txn in active_txns.values() { *counts.entry(txn.isolation_level).or_insert(0) += 1; } counts }; TransactionStats { active_transactions: active_count, committed_transactions: committed_count, log_entries, isolation_level_counts, } } } /// Transaction statistics #[derive(Debug, Clone)] pub struct TransactionStats { pub active_transactions: usize, pub committed_transactions: usize, pub log_entries: usize, pub isolation_level_counts: HashMap<IsolationLevel, usize>, } #[cfg(test)] mod tests { use super::*; use tokio_test; #[tokio::test] async fn test_transaction_lifecycle() { let config = ConsistencyConfig::default(); let manager = ConsistencyManager::new(config); // Begin transaction let txn_id = manager .begin_transaction(IsolationLevel::ReadCommitted) .unwrap(); assert!(txn_id > 0); // Add operations let nid = NodeId::new_v4(); let operation = VectorOperation::Insert { node_id: nid, vector: vec![1.0, 2.0, 3.0], }; manager.add_operation(txn_id, operation).unwrap(); // Prepare transaction manager.prepare_transaction(txn_id).unwrap(); // Commit transaction manager.commit_transaction(txn_id).unwrap(); let stats = manager.get_transaction_stats(); assert_eq!(stats.committed_transactions, 1); } #[tokio::test] async fn test_lock_acquisition() { let config = ConsistencyConfig::default(); let manager = ConsistencyManager::new(config); let txn_id = manager .begin_transaction(IsolationLevel::ReadCommitted) .unwrap(); // Acquire shared lock let nid = NodeId::new_v4(); manager .acquire_lock(txn_id, nid, LockMode::Shared) .await .unwrap(); // Try to acquire incompatible lock from different transaction let txn_id2 = manager .begin_transaction(IsolationLevel::ReadCommitted) .unwrap(); let result = tokio::time::timeout( Duration::from_millis(100), manager.acquire_lock(txn_id2, nid, LockMode::Exclusive), ) .await; assert!(result.is_err()); // Should timeout due to lock conflict } #[tokio::test] async fn test_transaction_abort() { let config = ConsistencyConfig::default(); let manager = ConsistencyManager::new(config); let txn_id = manager .begin_transaction(IsolationLevel::ReadCommitted) .unwrap(); let nid = NodeId::new_v4(); let operation = VectorOperation::Insert { node_id: nid, vector: vec![1.0, 2.0, 3.0], }; manager.add_operation(txn_id, operation).unwrap(); // Abort transaction let rollback_ops = manager.abort_transaction(txn_id).unwrap(); assert_eq!(rollback_ops.len(), 1); match &rollback_ops[0] { VectorOperation::Delete { node_id, .. } => { assert_eq!(*node_id, nid); } _ => panic!("Expected delete operation for rollback"), } } #[tokio::test] async fn test_consistency_checkpoint() { let config = ConsistencyConfig::default(); let manager = ConsistencyManager::new(config); let checkpoint = manager.create_checkpoint().unwrap(); assert!(checkpoint.checkpoint_id > 0); let latest = manager.get_latest_checkpoint().unwrap(); assert_eq!(latest.checkpoint_id, checkpoint.checkpoint_id); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server