Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
transactional_graph.rs•19.8 kB
#![allow(dead_code, unused_variables, unused_imports)] use async_trait::async_trait; use codegraph_core::{ CodeGraphError, CodeNode, GraphStore, IsolationLevel, NodeId, Result, SnapshotId, TransactionId, TransactionManager, VersionId, VersionedStore, WriteOperation, }; use parking_lot::RwLock; use std::sync::Arc; use crate::VersionedRocksDbStorage; pub struct TransactionalGraph { storage: Arc<RwLock<VersionedRocksDbStorage>>, current_transaction: Option<TransactionId>, isolation_level: IsolationLevel, } impl TransactionalGraph { pub async fn new(storage_path: &str) -> Result<Self> { let storage = VersionedRocksDbStorage::new(storage_path).await?; Ok(Self { storage: Arc::new(RwLock::new(storage)), current_transaction: None, isolation_level: IsolationLevel::ReadCommitted, }) } pub async fn begin_transaction(&mut self) -> Result<TransactionId> { self.begin_transaction_with_isolation(self.isolation_level) .await } pub async fn begin_transaction_with_isolation( &mut self, isolation_level: IsolationLevel, ) -> Result<TransactionId> { if self.current_transaction.is_some() { return Err(CodeGraphError::Transaction( "Transaction already active".to_string(), )); } let transaction_id = { let mut storage = self.storage.write(); storage.begin_transaction(isolation_level).await? }; self.current_transaction = Some(transaction_id); self.isolation_level = isolation_level; Ok(transaction_id) } pub async fn commit(&mut self) -> Result<()> { let transaction_id = self .current_transaction .take() .ok_or_else(|| CodeGraphError::Transaction("No active transaction".to_string()))?; // Run commit in a blocking task to avoid holding non-Send guards across .await let storage = self.storage.clone(); let res: Result<()> = tokio::task::spawn_blocking(move || { let handle = tokio::runtime::Handle::current(); handle.block_on(async move { let mut guard = storage.write(); guard.commit_transaction(transaction_id).await }) }) .await .map_err(|e| CodeGraphError::Threading(e.to_string()))?; res } pub async fn rollback(&mut self) -> Result<()> { let transaction_id = self .current_transaction .take() .ok_or_else(|| CodeGraphError::Transaction("No active transaction".to_string()))?; let storage = self.storage.clone(); tokio::task::spawn_blocking(move || { let handle = tokio::runtime::Handle::current(); handle.block_on(async move { let mut guard = storage.write(); guard.rollback_transaction(transaction_id).await }) }) .await .map_err(|e| CodeGraphError::Threading(e.to_string()))? } pub async fn create_savepoint(&self) -> Result<SnapshotId> { let transaction_id = self .current_transaction .ok_or_else(|| CodeGraphError::Transaction("No active transaction".to_string()))?; self.storage.write().create_snapshot(transaction_id).await } pub async fn rollback_to_savepoint(&mut self, snapshot_id: SnapshotId) -> Result<()> { let _transaction_id = self .current_transaction .ok_or_else(|| CodeGraphError::Transaction("No active transaction".to_string()))?; // TODO: Implement rollback to specific savepoint // This would involve: // 1. Identify the write operations performed after the savepoint // 2. Reverse those operations // 3. Update the transaction's write set Ok(()) } pub async fn create_version( &mut self, name: String, description: String, author: String, parent_versions: Vec<VersionId>, ) -> Result<VersionId> { let transaction_id = self .current_transaction .ok_or_else(|| CodeGraphError::Transaction("No active transaction".to_string()))?; // First commit the current transaction to create a snapshot let snapshot_id = { let storage = self.storage.clone(); tokio::task::spawn_blocking(move || { let handle = tokio::runtime::Handle::current(); handle.block_on(async move { let mut guard = storage.write(); guard.create_snapshot(transaction_id).await }) }) .await .map_err(|e| CodeGraphError::Threading(e.to_string()))?? }; // Create a version pointing to this snapshot self.storage .write() .create_version(name, description, author, snapshot_id, parent_versions) .await } pub async fn checkout_version(&mut self, version_id: VersionId) -> Result<()> { if self.current_transaction.is_some() { return Err(CodeGraphError::Transaction( "Cannot checkout version with active transaction".to_string(), )); } let storage = self.storage.read(); let version = storage .get_version(version_id) .await? .ok_or_else(|| CodeGraphError::Transaction("Version not found".to_string()))?; // TODO: Update the current working state to reflect this version // This would involve setting up the graph to read from the specific snapshot Ok(()) } fn get_current_transaction_id(&self) -> Result<TransactionId> { self.current_transaction .ok_or_else(|| CodeGraphError::Transaction("No active transaction".to_string())) } } #[async_trait] impl GraphStore for TransactionalGraph { async fn add_node(&mut self, node: CodeNode) -> Result<()> { let transaction_id = self.get_current_transaction_id()?; // Generate content hash for the node let content_hash = { let serialized = serde_json::to_vec(&node).map_err(|e| CodeGraphError::Database(e.to_string()))?; use sha2::{Digest, Sha256}; let mut hasher = Sha256::new(); hasher.update(&serialized); format!("{:x}", hasher.finalize()) }; // Add to transaction's write set let write_op = WriteOperation::Insert(node.id); { let storage = self.storage.clone(); let node_id = node.id; let write_op_cloned = write_op.clone(); tokio::task::spawn_blocking(move || { let handle = tokio::runtime::Handle::current(); handle.block_on(async move { let mut guard = storage.write(); guard .add_to_write_set(transaction_id, node_id, write_op_cloned) .await }) }) .await .map_err(|e| CodeGraphError::Threading(e.to_string()))??; } // Store the actual content in the storage with the hash // TODO: Implement content-addressed storage for the node data Ok(()) } async fn get_node(&self, id: NodeId) -> Result<Option<CodeNode>> { let transaction_id = self.get_current_transaction_id()?; // Add to read set for isolation level validation without holding non-Send guards across .await { let storage = self.storage.clone(); tokio::task::spawn_blocking(move || { let handle = tokio::runtime::Handle::current(); handle.block_on(async move { let mut guard = storage.write(); guard.add_to_read_set(transaction_id, id).await }) }) .await .map_err(|e| CodeGraphError::Threading(e.to_string()))??; } // Check if this node is in the current transaction's write set first let is_in_write = { let storage = self.storage.clone(); tokio::task::spawn_blocking(move || { let handle = tokio::runtime::Handle::current(); handle.block_on(async move { let guard = storage.read(); Ok::<_, CodeGraphError>(guard.get_transaction(transaction_id).await?) }) }) .await .map_err(|e| CodeGraphError::Threading(e.to_string()))?? }; if let Some(transaction) = is_in_write { if transaction.write_set.contains_key(&id) { // Node is being modified in this transaction // TODO: Return the modified version from the write set } } // If not in write set, read from the snapshot let maybe_tx = { let storage = self.storage.clone(); tokio::task::spawn_blocking(move || { let handle = tokio::runtime::Handle::current(); handle.block_on(async move { let guard = storage.read(); Ok::<_, CodeGraphError>(guard.get_transaction(transaction_id).await?) }) }) .await .map_err(|e| CodeGraphError::Threading(e.to_string()))?? }; if let Some(_transaction) = maybe_tx { // TODO: Read node from the transaction's snapshot // This would involve reading from the snapshot's content store } // For now, return None as placeholder Ok(None) } async fn update_node(&mut self, node: CodeNode) -> Result<()> { let transaction_id = self.get_current_transaction_id()?; // Get the current version of the node to create before/after images let old_node = self.get_node(node.id).await?; let (old_hash, new_hash) = { let old_hash = if let Some(ref old) = old_node { let serialized = serde_json::to_vec(old).map_err(|e| CodeGraphError::Database(e.to_string()))?; use sha2::{Digest, Sha256}; let mut hasher = Sha256::new(); hasher.update(&serialized); format!("{:x}", hasher.finalize()) } else { String::new() }; let new_serialized = serde_json::to_vec(&node).map_err(|e| CodeGraphError::Database(e.to_string()))?; use sha2::{Digest, Sha256}; let mut hasher = Sha256::new(); hasher.update(&new_serialized); let new_hash = format!("{:x}", hasher.finalize()); (old_hash, new_hash) }; // Add to transaction's write set let write_op = WriteOperation::Update { old_content_hash: old_hash, new_content_hash: new_hash, }; { let storage = self.storage.clone(); let node_id = node.id; let write_op_cloned = write_op.clone(); tokio::task::spawn_blocking(move || { let handle = tokio::runtime::Handle::current(); handle.block_on(async move { let mut guard = storage.write(); guard .add_to_write_set(transaction_id, node_id, write_op_cloned) .await }) }) .await .map_err(|e| CodeGraphError::Threading(e.to_string()))??; } Ok(()) } async fn remove_node(&mut self, id: NodeId) -> Result<()> { let transaction_id = self.get_current_transaction_id()?; // Add to transaction's write set let write_op = WriteOperation::Delete(id); { let storage = self.storage.clone(); tokio::task::spawn_blocking(move || { let handle = tokio::runtime::Handle::current(); handle.block_on(async move { let mut guard = storage.write(); guard.add_to_write_set(transaction_id, id, write_op).await }) }) .await .map_err(|e| CodeGraphError::Threading(e.to_string()))??; } Ok(()) } async fn find_nodes_by_name(&self, name: &str) -> Result<Vec<CodeNode>> { let _transaction_id = self.get_current_transaction_id()?; // TODO: Implement transactional node search // This would involve: // 1. Reading from the current transaction's snapshot // 2. Applying any pending writes from the transaction // 3. Returning the merged view Ok(Vec::new()) } } pub struct ReadOnlyTransactionalGraph { storage: Arc<RwLock<VersionedRocksDbStorage>>, snapshot_id: SnapshotId, } impl ReadOnlyTransactionalGraph { pub async fn new( storage: Arc<RwLock<VersionedRocksDbStorage>>, snapshot_id: SnapshotId, ) -> Self { Self { storage, snapshot_id, } } pub async fn at_version( storage: Arc<RwLock<VersionedRocksDbStorage>>, version_id: VersionId, ) -> Result<Self> { let snapshot_id = { let storage = storage.read(); let version = storage .get_version(version_id) .await? .ok_or_else(|| CodeGraphError::Transaction("Version not found".to_string()))?; version.snapshot_id }; Ok(Self::new(storage, snapshot_id).await) } } #[async_trait] impl GraphStore for ReadOnlyTransactionalGraph { async fn add_node(&mut self, _node: CodeNode) -> Result<()> { Err(CodeGraphError::Transaction( "Cannot modify read-only graph".to_string(), )) } async fn get_node(&self, id: NodeId) -> Result<Option<CodeNode>> { // TODO: Read node from the specific snapshot // This involves looking up the content hash for this node in the snapshot // and then retrieving the content from the content store Ok(None) } async fn update_node(&mut self, _node: CodeNode) -> Result<()> { Err(CodeGraphError::Transaction( "Cannot modify read-only graph".to_string(), )) } async fn remove_node(&mut self, _id: NodeId) -> Result<()> { Err(CodeGraphError::Transaction( "Cannot modify read-only graph".to_string(), )) } async fn find_nodes_by_name(&self, _name: &str) -> Result<Vec<CodeNode>> { // TODO: Implement snapshot-based node search Ok(Vec::new()) } } pub struct ConcurrentTransactionManager { storage: Arc<RwLock<VersionedRocksDbStorage>>, max_concurrent_transactions: usize, deadlock_detection_enabled: bool, } impl ConcurrentTransactionManager { pub fn new( storage: Arc<RwLock<VersionedRocksDbStorage>>, max_concurrent_transactions: usize, ) -> Self { Self { storage, max_concurrent_transactions, deadlock_detection_enabled: true, } } pub async fn create_transaction( &self, isolation_level: IsolationLevel, ) -> Result<TransactionalGraph> { let mut graph = TransactionalGraph { storage: self.storage.clone(), current_transaction: None, isolation_level, }; graph .begin_transaction_with_isolation(isolation_level) .await?; Ok(graph) } pub async fn create_read_only_transaction_at_version( &self, version_id: VersionId, ) -> Result<ReadOnlyTransactionalGraph> { ReadOnlyTransactionalGraph::at_version(self.storage.clone(), version_id).await } pub async fn detect_deadlocks(&self) -> Result<Vec<TransactionId>> { if !self.deadlock_detection_enabled { return Ok(Vec::new()); } // TODO: Implement deadlock detection algorithm // This would involve: // 1. Building a wait-for graph of transactions // 2. Detecting cycles in the graph // 3. Selecting victim transactions to abort Ok(Vec::new()) } pub async fn get_transaction_statistics(&self) -> Result<TransactionStatistics> { // TODO: Collect statistics about active transactions Ok(TransactionStatistics { active_transactions: 0, committed_transactions: 0, aborted_transactions: 0, average_commit_time_ms: 0.0, deadlocks_detected: 0, }) } } #[derive(Debug, Clone)] pub struct TransactionStatistics { pub active_transactions: usize, pub committed_transactions: u64, pub aborted_transactions: u64, pub average_commit_time_ms: f64, pub deadlocks_detected: u64, } #[cfg(test)] mod tests { use super::*; use tempfile::tempdir; use tokio_test; #[tokio::test] async fn test_basic_transaction() -> Result<()> { let temp_dir = tempdir().unwrap(); let mut graph = TransactionalGraph::new(temp_dir.path().to_str().unwrap()).await?; // Begin transaction let tx_id = graph.begin_transaction().await?; assert!(graph.current_transaction.is_some()); // Add a node let node = CodeNode::new( "test_function".to_string(), Some(codegraph_core::NodeType::Function), Some(codegraph_core::Language::Rust), codegraph_core::Location { file_path: "test.rs".to_string(), line: 1, column: 0, end_line: Some(10), end_column: Some(0), }, ); graph.add_node(node.clone()).await?; // Commit transaction graph.commit().await?; assert!(graph.current_transaction.is_none()); Ok(()) } #[tokio::test] async fn test_rollback_transaction() -> Result<()> { let temp_dir = tempdir().unwrap(); let mut graph = TransactionalGraph::new(temp_dir.path().to_str().unwrap()).await?; // Begin transaction graph.begin_transaction().await?; // Add a node let node = CodeNode::new( "test_function".to_string(), Some(codegraph_core::NodeType::Function), Some(codegraph_core::Language::Rust), codegraph_core::Location { file_path: "test.rs".to_string(), line: 1, column: 0, end_line: Some(10), end_column: Some(0), }, ); graph.add_node(node.clone()).await?; // Rollback transaction graph.rollback().await?; assert!(graph.current_transaction.is_none()); Ok(()) } #[tokio::test] async fn test_isolation_levels() -> Result<()> { let temp_dir = tempdir().unwrap(); let mut graph1 = TransactionalGraph::new(temp_dir.path().to_str().unwrap()).await?; let mut graph2 = TransactionalGraph::new(temp_dir.path().to_str().unwrap()).await?; // Test READ_COMMITTED isolation graph1 .begin_transaction_with_isolation(IsolationLevel::ReadCommitted) .await?; graph2 .begin_transaction_with_isolation(IsolationLevel::ReadCommitted) .await?; // TODO: Add tests for different isolation behaviors graph1.commit().await?; graph2.commit().await?; Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server