Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
updater.rs24.5 kB
use crate::integration::graph_vector::{GraphVectorIntegrator, SnippetExtractor}; use crate::traits::{CodeParser, GraphStore}; use crate::{CodeNode, Language, NodeId, NodeType, Result}; use dashmap::DashMap; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::Mutex; use tracing::warn; /// Unique identity of a code element within a file for diffing purposes. /// We include name, type, language and file path to minimize collisions. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct NodeKey { pub file_path: String, pub name: String, pub node_type: Option<NodeType>, pub language: Option<Language>, } impl NodeKey { pub fn from_node(n: &CodeNode) -> Self { Self { file_path: n.location.file_path.clone(), name: n.name.as_str().to_string(), node_type: n.node_type.clone(), language: n.language.clone(), } } } impl Hash for NodeKey { fn hash<H: Hasher>(&self, state: &mut H) { self.file_path.hash(state); self.name.hash(state); if let Some(t) = &self.node_type { t.hash(state); } if let Some(l) = &self.language { l.hash(state); } } } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NodeSig { pub node_id: NodeId, pub signature: u64, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FileAstSnapshot { pub file_path: String, pub updated_at: std::time::SystemTime, pub entries: HashMap<NodeKey, NodeSig>, } #[derive(Debug, Default)] pub struct IncrementalCache { // file -> snapshot by_file: DashMap<String, Arc<RwLock<FileAstSnapshot>>>, // node -> file (for fast invalidation) node_to_file: DashMap<NodeId, String>, // capacity limits (simple LRU-like eviction by timestamp) max_files: usize, } impl IncrementalCache { pub fn new(max_files: usize) -> Self { Self { by_file: DashMap::new(), node_to_file: DashMap::new(), max_files, } } pub fn get(&self, file: &str) -> Option<Arc<RwLock<FileAstSnapshot>>> { self.by_file.get(file).map(|e| e.clone()) } pub fn insert(&self, snapshot: FileAstSnapshot) { let file = snapshot.file_path.clone(); for (_k, v) in snapshot.entries.iter() { self.node_to_file.insert(v.node_id, file.clone()); } self.by_file .insert(file.clone(), Arc::new(RwLock::new(snapshot))); self.evict_if_needed(); } pub fn invalidate_file(&self, file: &str) { if let Some(s) = self.by_file.remove(file) { let snap = s.1.read(); for (_k, v) in snap.entries.iter() { self.node_to_file.remove(&v.node_id); } } } pub fn invalidate_node(&self, node: NodeId) { if let Some(file) = self.node_to_file.get(&node).map(|e| e.clone()) { if let Some(s) = self.by_file.get(&file) { let mut snap = s.write(); let old_keys: Vec<NodeKey> = snap .entries .iter() .filter(|(_k, v)| v.node_id == node) .map(|(k, _)| k.clone()) .collect(); for k in old_keys { snap.entries.remove(&k); } } self.node_to_file.remove(&node); } } pub fn clear(&self) { self.by_file.clear(); self.node_to_file.clear(); } fn evict_if_needed(&self) { if self.by_file.len() <= self.max_files { return; } // naive eviction: remove the oldest by updated_at let mut items: Vec<(String, std::time::SystemTime)> = Vec::with_capacity(self.by_file.len()); for e in self.by_file.iter() { items.push((e.key().clone(), e.value().read().updated_at)); } items.sort_by_key(|(_f, t)| *t); let to_remove = items.len().saturating_sub(self.max_files); for i in 0..to_remove { self.invalidate_file(&items[i].0); } } } #[derive(Debug, Clone)] pub struct AstDiff { pub added: Vec<CodeNode>, pub changed: Vec<(NodeId, CodeNode)>, // (old_id, new_node_with_preserved_id) pub removed: Vec<NodeId>, } impl AstDiff { pub fn empty() -> Self { Self { added: vec![], changed: vec![], removed: vec![], } } } #[derive(Debug)] pub struct UpdateResult { pub file_path: String, pub added: usize, pub changed: usize, pub removed: usize, pub duration: Duration, } /// Tracks applied operations so we can rollback on failure. #[derive(Default)] struct UpdateTransactionLog { added: Vec<NodeId>, updated_prev: Vec<CodeNode>, removed_prev: Vec<CodeNode>, } /// Incremental updater orchestrating parser, graph, vector and cache for a single file. pub struct IncrementalUpdater { parser: Arc<dyn CodeParser>, graph: Arc<Mutex<dyn GraphStore + Send>>, // serialized graph ops vector: Option<Arc<GraphVectorIntegrator>>, // vector sync (optional) cache: Arc<IncrementalCache>, extractor: SnippetExtractor, // for robust signatures } impl IncrementalUpdater { pub fn new( parser: Arc<dyn CodeParser>, graph: Arc<Mutex<dyn GraphStore + Send>>, vector: Option<Arc<GraphVectorIntegrator>>, ) -> Self { Self { parser, graph, vector, cache: Arc::new(IncrementalCache::new(100_000)), extractor: SnippetExtractor::default(), } } pub fn with_cache_capacity(mut self, max_files: usize) -> Self { self.cache = Arc::new(IncrementalCache::new(max_files)); self } pub fn cache(&self) -> Arc<IncrementalCache> { self.cache.clone() } pub fn extractor_mut(&mut self) -> &mut SnippetExtractor { &mut self.extractor } /// Compute a stable content signature for a node (used for change detection and vector updates). fn signature(&self, node: &CodeNode) -> u64 { let mut s = std::collections::hash_map::DefaultHasher::new(); // Compose feature-rich content, prefer explicit content or extracted snippet if let Some(c) = &node.content { c.as_str().hash(&mut s); } else { let snip = self.extractor.extract(node); snip.hash(&mut s); } // Also include key attributes to reduce accidental collisions NodeKey::from_node(node).hash(&mut s); s.finish() } #[allow(dead_code)] fn build_snapshot(&self, file_path: &str, nodes: &[CodeNode]) -> FileAstSnapshot { let mut entries = HashMap::with_capacity(nodes.len()); for n in nodes { let key = NodeKey::from_node(n); let sig = self.signature(n); entries.insert( key, NodeSig { node_id: n.id, signature: sig, }, ); } FileAstSnapshot { file_path: file_path.to_string(), updated_at: std::time::SystemTime::now(), entries, } } fn diff_snapshots( &self, old: Option<&FileAstSnapshot>, new_nodes: &mut Vec<CodeNode>, ) -> AstDiff { let mut diff = AstDiff::empty(); let mut new_pairs: Vec<(NodeKey, CodeNode)> = Vec::with_capacity(new_nodes.len()); for n in new_nodes.drain(..) { let k = NodeKey::from_node(&n); new_pairs.push((k, n)); } let new_key_set: HashSet<NodeKey> = new_pairs.iter().map(|(k, _)| k.clone()).collect(); if let Some(old_snap) = old { for (k, mut new_node) in new_pairs.into_iter() { match old_snap.entries.get(&k) { None => { diff.added.push(new_node); } Some(v) => { let sig_new = self.signature(&new_node); if sig_new != v.signature { new_node.id = v.node_id; diff.changed.push((v.node_id, new_node)); } } } } for (k, v) in old_snap.entries.iter() { if !new_key_set.contains(k) { diff.removed.push(v.node_id); } } } else { diff.added.extend(new_pairs.into_iter().map(|(_, n)| n)); } diff } /// Update a single file: parse, diff, apply selective graph updates, vector maintenance, cache update. pub async fn update_file(&self, file_path: &str) -> Result<UpdateResult> { let start = Instant::now(); // Parse new AST nodes let mut nodes = self.parser.parse_file(file_path).await?; // Load old snapshot (if any) let old_snapshot_arc = self.cache.get(file_path); let old_snapshot = old_snapshot_arc.as_ref().map(|a| a.read().clone()); // Diff let diff = self.diff_snapshots(old_snapshot.as_ref(), &mut nodes); // Apply updates with rollback log let mut txlog = UpdateTransactionLog::default(); if let Err(e) = self.apply_diff(file_path, &diff, &mut txlog).await { warn!("apply_diff failed: {} - rolling back", e); if let Err(rb) = self.rollback(&txlog).await { warn!("rollback also failed: {}", rb); } return Err(e); } // Vector maintenance (best-effort; do not rollback graph if vectors fail) if let Some(v) = &self.vector { // Build the changed+added payload let mut changed_nodes: Vec<CodeNode> = Vec::with_capacity(diff.changed.len() + diff.added.len()); changed_nodes.extend(diff.changed.iter().map(|(_id, n)| n.clone())); changed_nodes.extend(diff.added.iter().cloned()); let deleted_ids: Vec<NodeId> = diff.removed.clone(); if let Err(ve) = v.sync_changes(&changed_nodes, &deleted_ids).await { warn!("vector sync failed (non-fatal): {}", ve); } } // Update cache snapshot by transforming previous entries let mut new_entries: HashMap<NodeKey, NodeSig> = old_snapshot .as_ref() .map(|s| s.entries.clone()) .unwrap_or_default(); // removals let removed_set: HashSet<NodeId> = diff.removed.iter().cloned().collect(); new_entries.retain(|_k, v| !removed_set.contains(&v.node_id)); // changes and additions for (_old, n) in diff.changed.iter() { let key = NodeKey::from_node(n); new_entries.insert( key, NodeSig { node_id: n.id, signature: self.signature(n), }, ); } for n in diff.added.iter() { let key = NodeKey::from_node(n); new_entries.insert( key, NodeSig { node_id: n.id, signature: self.signature(n), }, ); } self.cache.insert(FileAstSnapshot { file_path: file_path.to_string(), updated_at: std::time::SystemTime::now(), entries: new_entries, }); Ok(UpdateResult { file_path: file_path.to_string(), added: diff.added.len(), changed: diff.changed.len(), removed: diff.removed.len(), duration: start.elapsed(), }) } async fn apply_diff( &self, _file_path: &str, diff: &AstDiff, txlog: &mut UpdateTransactionLog, ) -> Result<()> { let mut graph = self.graph.lock().await; // Apply changed: preserve NodeId, update node for (old_id, mut new_node) in diff.changed.iter().cloned() { // backup existing if let Some(prev) = graph.get_node(old_id).await? { txlog.updated_prev.push(prev.clone()); } new_node.id = old_id; // ensure id is stable graph.update_node(new_node.clone()).await?; } // Apply added for n in diff.added.iter().cloned() { graph.add_node(n.clone()).await?; txlog.added.push(n.id); } // Apply removed for id in diff.removed.iter().cloned() { if let Some(prev) = graph.get_node(id).await? { txlog.removed_prev.push(prev.clone()); } graph.remove_node(id).await?; } Ok(()) } async fn rollback(&self, txlog: &UpdateTransactionLog) -> Result<()> { let mut graph = self.graph.lock().await; // 1) Re-add removed nodes for n in txlog.removed_prev.iter().cloned() { let _ = graph.add_node(n).await; } // 2) Revert updates for n in txlog.updated_prev.iter().cloned() { let _ = graph.update_node(n).await; } // 3) Remove added nodes for id in txlog.added.iter().cloned() { let _ = graph.remove_node(id).await; } Ok(()) } // Cache invalidation helpers pub fn invalidate_file(&self, file: &str) { self.cache.invalidate_file(file); } pub fn invalidate_node(&self, id: NodeId) { self.cache.invalidate_node(id); } pub fn clear_cache(&self) { self.cache.clear(); } } // ----------------------------- // Tests // ----------------------------- #[cfg(test)] mod tests { use super::*; use crate::traits::VectorStore; use crate::CodeGraphError; use async_trait::async_trait; use crossbeam_channel::{unbounded, Receiver, Sender}; use tokio_test::block_on; struct InMemoryParser { files: DashMap<String, Vec<CodeNode>>, } #[async_trait] impl CodeParser for InMemoryParser { async fn parse_file(&self, file_path: &str) -> Result<Vec<CodeNode>> { Ok(self .files .get(file_path) .map(|e| e.clone()) .unwrap_or_default()) } fn supported_languages(&self) -> Vec<crate::Language> { vec![Language::Rust] } } struct InMemoryGraph { nodes: DashMap<NodeId, CodeNode>, } #[async_trait] impl GraphStore for InMemoryGraph { async fn add_node(&mut self, node: CodeNode) -> Result<()> { self.nodes.insert(node.id, node); Ok(()) } async fn get_node(&self, id: NodeId) -> Result<Option<CodeNode>> { Ok(self.nodes.get(&id).map(|e| e.clone())) } async fn update_node(&mut self, node: CodeNode) -> Result<()> { self.nodes.insert(node.id, node); Ok(()) } async fn remove_node(&mut self, id: NodeId) -> Result<()> { self.nodes.remove(&id); Ok(()) } async fn find_nodes_by_name(&self, name: &str) -> Result<Vec<CodeNode>> { Ok(self .nodes .iter() .filter(|e| e.name.as_str() == name) .map(|e| e.clone()) .collect()) } } struct InMemoryVectorStore { embs: DashMap<NodeId, Vec<f32>>, } #[async_trait] impl VectorStore for InMemoryVectorStore { async fn store_embeddings(&mut self, nodes: &[CodeNode]) -> Result<()> { for n in nodes { if let Some(e) = &n.embedding { self.embs.insert(n.id, e.clone()); } } Ok(()) } async fn search_similar( &self, _query_embedding: &[f32], _limit: usize, ) -> Result<Vec<NodeId>> { Ok(vec![]) } async fn get_embedding(&self, node_id: NodeId) -> Result<Option<Vec<f32>>> { Ok(self.embs.get(&node_id).map(|e| e.clone())) } } fn make_node(file: &str, name: &str, content: &str) -> CodeNode { let now = chrono::Utc::now(); CodeNode { id: NodeId::new_v4(), name: name.into(), node_type: Some(NodeType::Function), language: Some(Language::Rust), location: crate::Location { file_path: file.into(), line: 1, column: 1, end_line: Some(1), end_column: Some(10), }, content: Some(content.into()), metadata: crate::Metadata { attributes: Default::default(), created_at: now, updated_at: now, }, embedding: None, complexity: None, } } #[tokio::test] async fn detect_add_change_remove() { let parser = Arc::new(InMemoryParser { files: DashMap::new(), }); let graph = Arc::new(Mutex::new(InMemoryGraph { nodes: DashMap::new(), })); let vector = None; let updater = IncrementalUpdater::new(parser.clone(), graph.clone(), vector); // initial file with two functions let f = "a.rs"; let a1 = make_node(f, "foo", "fn foo()->i32{1}"); let b1 = make_node(f, "bar", "fn bar()->i32{2}"); parser.files.insert(f.into(), vec![a1.clone(), b1.clone()]); let r1 = updater.update_file(f).await.unwrap(); assert_eq!(r1.added, 2); assert_eq!(r1.changed, 0); assert_eq!(r1.removed, 0); // modify foo, remove bar, add baz let mut a2 = a1.clone(); a2.content = Some("fn foo()->i32{3}".into()); let c1 = make_node(f, "baz", "fn baz()->i32{4}"); parser.files.insert(f.into(), vec![a2.clone(), c1.clone()]); let r2 = updater.update_file(f).await.unwrap(); assert_eq!(r2.added, 1); // baz assert_eq!(r2.changed, 1); // foo assert_eq!(r2.removed, 1); // bar } #[tokio::test] async fn preserves_node_id_on_change() { let parser = Arc::new(InMemoryParser { files: DashMap::new(), }); let graph = Arc::new(Mutex::new(InMemoryGraph { nodes: DashMap::new(), })); let updater = IncrementalUpdater::new(parser.clone(), graph.clone(), None); let f = "b.rs"; let a1 = make_node(f, "foo", "fn foo()->i32{1}"); parser.files.insert(f.into(), vec![a1.clone()]); updater.update_file(f).await.unwrap(); // change content, keep identity let mut a2 = a1.clone(); a2.content = Some("fn foo()->i32{42}".into()); parser.files.insert(f.into(), vec![a2.clone()]); updater.update_file(f).await.unwrap(); let g = graph.lock().await; let cur = g.get_node(a1.id).await.unwrap().unwrap(); assert_eq!(cur.id, a1.id); assert_eq!(cur.content.as_ref().unwrap().as_str(), "fn foo()->i32{42}"); } #[tokio::test] async fn rollback_on_failure_reverts_changes() { // Graph that fails on update to simulate an error struct FailingGraph { nodes: DashMap<NodeId, CodeNode>, } #[async_trait] impl GraphStore for FailingGraph { async fn add_node(&mut self, node: CodeNode) -> Result<()> { self.nodes.insert(node.id, node); Ok(()) } async fn get_node(&self, id: NodeId) -> Result<Option<CodeNode>> { Ok(self.nodes.get(&id).map(|e| e.clone())) } async fn update_node(&mut self, _node: CodeNode) -> Result<()> { Err(CodeGraphError::Database("fail".into())) } async fn remove_node(&mut self, id: NodeId) -> Result<()> { self.nodes.remove(&id); Ok(()) } async fn find_nodes_by_name(&self, _name: &str) -> Result<Vec<CodeNode>> { Ok(vec![]) } } let parser = Arc::new(InMemoryParser { files: DashMap::new(), }); let graph = Arc::new(Mutex::new(FailingGraph { nodes: DashMap::new(), })); let updater = IncrementalUpdater::new(parser.clone(), graph.clone(), None); let f = "c.rs"; let a1 = make_node(f, "foo", "fn foo()->i32{1}"); parser.files.insert(f.into(), vec![a1.clone()]); // First update adds node successfully let _ = updater.update_file(f).await.unwrap(); // Second update triggers update failure and should rollback let mut a2 = a1.clone(); a2.content = Some("fn foo()->i32{2}".into()); parser.files.insert(f.into(), vec![a2.clone()]); let err = updater.update_file(f).await.err(); assert!(err.is_some()); // Node content should remain original let g = graph.lock().await; let cur = g.get_node(a1.id).await.unwrap().unwrap(); assert_eq!(cur.content.as_ref().unwrap().as_str(), "fn foo()->i32{1}"); } #[tokio::test] async fn cache_invalidation_works() { let parser = Arc::new(InMemoryParser { files: DashMap::new(), }); let graph = Arc::new(Mutex::new(InMemoryGraph { nodes: DashMap::new(), })); let updater = IncrementalUpdater::new(parser.clone(), graph.clone(), None).with_cache_capacity(2); let f1 = "d1.rs"; let f2 = "d2.rs"; let f3 = "d3.rs"; parser .files .insert(f1.into(), vec![make_node(f1, "a", "1")]); parser .files .insert(f2.into(), vec![make_node(f2, "b", "2")]); parser .files .insert(f3.into(), vec![make_node(f3, "c", "3")]); updater.update_file(f1).await.unwrap(); updater.update_file(f2).await.unwrap(); updater.update_file(f3).await.unwrap(); // capacity 2 should evict at least one (the oldest) let c = updater.cache(); let present = c.get(f1).is_some() as i32 + c.get(f2).is_some() as i32 + c.get(f3).is_some() as i32; assert!(present >= 2); // not strict but ensures eviction ran } #[tokio::test] async fn vector_updates_only_for_changes() { // Wire vector integrator with simple hasher embedder let parser = Arc::new(InMemoryParser { files: DashMap::new(), }); let graph = Arc::new(Mutex::new(InMemoryGraph { nodes: DashMap::new(), })); let vstore = InMemoryVectorStore { embs: DashMap::new(), }; let embedder = Arc::new(crate::integration::graph_vector::HasherEmbeddingService::new(64)); // Provide a graph instance for vector integrator; it won't be used for indexing path here. let g_for_vec: Arc<dyn GraphStore> = Arc::new(InMemoryGraph { nodes: DashMap::new(), }); let integrator = Arc::new(GraphVectorIntegrator::new( g_for_vec, Box::new(vstore), embedder, )); let updater = IncrementalUpdater::new(parser.clone(), graph.clone(), Some(integrator.clone())); let f = "vec.rs"; let a1 = make_node(f, "foo", "fn foo(){1}"); parser.files.insert(f.into(), vec![a1.clone()]); updater.update_file(f).await.unwrap(); let first_ct = integrator.signature_len(); // unchanged update should not add embeddings updater.update_file(f).await.unwrap(); assert_eq!(first_ct, integrator.signature_len()); // change content → should update signature count unchanged but reembed let mut a2 = a1.clone(); a2.content = Some("fn foo(){2}".into()); parser.files.insert(f.into(), vec![a2.clone()]); updater.update_file(f).await.unwrap(); assert_eq!(integrator.signature_len(), first_ct); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server