updater.rs•24.5 kB
use crate::integration::graph_vector::{GraphVectorIntegrator, SnippetExtractor};
use crate::traits::{CodeParser, GraphStore};
use crate::{CodeNode, Language, NodeId, NodeType, Result};
use dashmap::DashMap;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tracing::warn;
/// Unique identity of a code element within a file for diffing purposes.
/// We include name, type, language and file path to minimize collisions.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct NodeKey {
pub file_path: String,
pub name: String,
pub node_type: Option<NodeType>,
pub language: Option<Language>,
}
impl NodeKey {
pub fn from_node(n: &CodeNode) -> Self {
Self {
file_path: n.location.file_path.clone(),
name: n.name.as_str().to_string(),
node_type: n.node_type.clone(),
language: n.language.clone(),
}
}
}
impl Hash for NodeKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.file_path.hash(state);
self.name.hash(state);
if let Some(t) = &self.node_type {
t.hash(state);
}
if let Some(l) = &self.language {
l.hash(state);
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeSig {
pub node_id: NodeId,
pub signature: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileAstSnapshot {
pub file_path: String,
pub updated_at: std::time::SystemTime,
pub entries: HashMap<NodeKey, NodeSig>,
}
#[derive(Debug, Default)]
pub struct IncrementalCache {
// file -> snapshot
by_file: DashMap<String, Arc<RwLock<FileAstSnapshot>>>,
// node -> file (for fast invalidation)
node_to_file: DashMap<NodeId, String>,
// capacity limits (simple LRU-like eviction by timestamp)
max_files: usize,
}
impl IncrementalCache {
pub fn new(max_files: usize) -> Self {
Self {
by_file: DashMap::new(),
node_to_file: DashMap::new(),
max_files,
}
}
pub fn get(&self, file: &str) -> Option<Arc<RwLock<FileAstSnapshot>>> {
self.by_file.get(file).map(|e| e.clone())
}
pub fn insert(&self, snapshot: FileAstSnapshot) {
let file = snapshot.file_path.clone();
for (_k, v) in snapshot.entries.iter() {
self.node_to_file.insert(v.node_id, file.clone());
}
self.by_file
.insert(file.clone(), Arc::new(RwLock::new(snapshot)));
self.evict_if_needed();
}
pub fn invalidate_file(&self, file: &str) {
if let Some(s) = self.by_file.remove(file) {
let snap = s.1.read();
for (_k, v) in snap.entries.iter() {
self.node_to_file.remove(&v.node_id);
}
}
}
pub fn invalidate_node(&self, node: NodeId) {
if let Some(file) = self.node_to_file.get(&node).map(|e| e.clone()) {
if let Some(s) = self.by_file.get(&file) {
let mut snap = s.write();
let old_keys: Vec<NodeKey> = snap
.entries
.iter()
.filter(|(_k, v)| v.node_id == node)
.map(|(k, _)| k.clone())
.collect();
for k in old_keys {
snap.entries.remove(&k);
}
}
self.node_to_file.remove(&node);
}
}
pub fn clear(&self) {
self.by_file.clear();
self.node_to_file.clear();
}
fn evict_if_needed(&self) {
if self.by_file.len() <= self.max_files {
return;
}
// naive eviction: remove the oldest by updated_at
let mut items: Vec<(String, std::time::SystemTime)> =
Vec::with_capacity(self.by_file.len());
for e in self.by_file.iter() {
items.push((e.key().clone(), e.value().read().updated_at));
}
items.sort_by_key(|(_f, t)| *t);
let to_remove = items.len().saturating_sub(self.max_files);
for i in 0..to_remove {
self.invalidate_file(&items[i].0);
}
}
}
#[derive(Debug, Clone)]
pub struct AstDiff {
pub added: Vec<CodeNode>,
pub changed: Vec<(NodeId, CodeNode)>, // (old_id, new_node_with_preserved_id)
pub removed: Vec<NodeId>,
}
impl AstDiff {
pub fn empty() -> Self {
Self {
added: vec![],
changed: vec![],
removed: vec![],
}
}
}
#[derive(Debug)]
pub struct UpdateResult {
pub file_path: String,
pub added: usize,
pub changed: usize,
pub removed: usize,
pub duration: Duration,
}
/// Tracks applied operations so we can rollback on failure.
#[derive(Default)]
struct UpdateTransactionLog {
added: Vec<NodeId>,
updated_prev: Vec<CodeNode>,
removed_prev: Vec<CodeNode>,
}
/// Incremental updater orchestrating parser, graph, vector and cache for a single file.
pub struct IncrementalUpdater {
parser: Arc<dyn CodeParser>,
graph: Arc<Mutex<dyn GraphStore + Send>>, // serialized graph ops
vector: Option<Arc<GraphVectorIntegrator>>, // vector sync (optional)
cache: Arc<IncrementalCache>,
extractor: SnippetExtractor, // for robust signatures
}
impl IncrementalUpdater {
pub fn new(
parser: Arc<dyn CodeParser>,
graph: Arc<Mutex<dyn GraphStore + Send>>,
vector: Option<Arc<GraphVectorIntegrator>>,
) -> Self {
Self {
parser,
graph,
vector,
cache: Arc::new(IncrementalCache::new(100_000)),
extractor: SnippetExtractor::default(),
}
}
pub fn with_cache_capacity(mut self, max_files: usize) -> Self {
self.cache = Arc::new(IncrementalCache::new(max_files));
self
}
pub fn cache(&self) -> Arc<IncrementalCache> {
self.cache.clone()
}
pub fn extractor_mut(&mut self) -> &mut SnippetExtractor {
&mut self.extractor
}
/// Compute a stable content signature for a node (used for change detection and vector updates).
fn signature(&self, node: &CodeNode) -> u64 {
let mut s = std::collections::hash_map::DefaultHasher::new();
// Compose feature-rich content, prefer explicit content or extracted snippet
if let Some(c) = &node.content {
c.as_str().hash(&mut s);
} else {
let snip = self.extractor.extract(node);
snip.hash(&mut s);
}
// Also include key attributes to reduce accidental collisions
NodeKey::from_node(node).hash(&mut s);
s.finish()
}
#[allow(dead_code)]
fn build_snapshot(&self, file_path: &str, nodes: &[CodeNode]) -> FileAstSnapshot {
let mut entries = HashMap::with_capacity(nodes.len());
for n in nodes {
let key = NodeKey::from_node(n);
let sig = self.signature(n);
entries.insert(
key,
NodeSig {
node_id: n.id,
signature: sig,
},
);
}
FileAstSnapshot {
file_path: file_path.to_string(),
updated_at: std::time::SystemTime::now(),
entries,
}
}
fn diff_snapshots(
&self,
old: Option<&FileAstSnapshot>,
new_nodes: &mut Vec<CodeNode>,
) -> AstDiff {
let mut diff = AstDiff::empty();
let mut new_pairs: Vec<(NodeKey, CodeNode)> = Vec::with_capacity(new_nodes.len());
for n in new_nodes.drain(..) {
let k = NodeKey::from_node(&n);
new_pairs.push((k, n));
}
let new_key_set: HashSet<NodeKey> = new_pairs.iter().map(|(k, _)| k.clone()).collect();
if let Some(old_snap) = old {
for (k, mut new_node) in new_pairs.into_iter() {
match old_snap.entries.get(&k) {
None => {
diff.added.push(new_node);
}
Some(v) => {
let sig_new = self.signature(&new_node);
if sig_new != v.signature {
new_node.id = v.node_id;
diff.changed.push((v.node_id, new_node));
}
}
}
}
for (k, v) in old_snap.entries.iter() {
if !new_key_set.contains(k) {
diff.removed.push(v.node_id);
}
}
} else {
diff.added.extend(new_pairs.into_iter().map(|(_, n)| n));
}
diff
}
/// Update a single file: parse, diff, apply selective graph updates, vector maintenance, cache update.
pub async fn update_file(&self, file_path: &str) -> Result<UpdateResult> {
let start = Instant::now();
// Parse new AST nodes
let mut nodes = self.parser.parse_file(file_path).await?;
// Load old snapshot (if any)
let old_snapshot_arc = self.cache.get(file_path);
let old_snapshot = old_snapshot_arc.as_ref().map(|a| a.read().clone());
// Diff
let diff = self.diff_snapshots(old_snapshot.as_ref(), &mut nodes);
// Apply updates with rollback log
let mut txlog = UpdateTransactionLog::default();
if let Err(e) = self.apply_diff(file_path, &diff, &mut txlog).await {
warn!("apply_diff failed: {} - rolling back", e);
if let Err(rb) = self.rollback(&txlog).await {
warn!("rollback also failed: {}", rb);
}
return Err(e);
}
// Vector maintenance (best-effort; do not rollback graph if vectors fail)
if let Some(v) = &self.vector {
// Build the changed+added payload
let mut changed_nodes: Vec<CodeNode> =
Vec::with_capacity(diff.changed.len() + diff.added.len());
changed_nodes.extend(diff.changed.iter().map(|(_id, n)| n.clone()));
changed_nodes.extend(diff.added.iter().cloned());
let deleted_ids: Vec<NodeId> = diff.removed.clone();
if let Err(ve) = v.sync_changes(&changed_nodes, &deleted_ids).await {
warn!("vector sync failed (non-fatal): {}", ve);
}
}
// Update cache snapshot by transforming previous entries
let mut new_entries: HashMap<NodeKey, NodeSig> = old_snapshot
.as_ref()
.map(|s| s.entries.clone())
.unwrap_or_default();
// removals
let removed_set: HashSet<NodeId> = diff.removed.iter().cloned().collect();
new_entries.retain(|_k, v| !removed_set.contains(&v.node_id));
// changes and additions
for (_old, n) in diff.changed.iter() {
let key = NodeKey::from_node(n);
new_entries.insert(
key,
NodeSig {
node_id: n.id,
signature: self.signature(n),
},
);
}
for n in diff.added.iter() {
let key = NodeKey::from_node(n);
new_entries.insert(
key,
NodeSig {
node_id: n.id,
signature: self.signature(n),
},
);
}
self.cache.insert(FileAstSnapshot {
file_path: file_path.to_string(),
updated_at: std::time::SystemTime::now(),
entries: new_entries,
});
Ok(UpdateResult {
file_path: file_path.to_string(),
added: diff.added.len(),
changed: diff.changed.len(),
removed: diff.removed.len(),
duration: start.elapsed(),
})
}
async fn apply_diff(
&self,
_file_path: &str,
diff: &AstDiff,
txlog: &mut UpdateTransactionLog,
) -> Result<()> {
let mut graph = self.graph.lock().await;
// Apply changed: preserve NodeId, update node
for (old_id, mut new_node) in diff.changed.iter().cloned() {
// backup existing
if let Some(prev) = graph.get_node(old_id).await? {
txlog.updated_prev.push(prev.clone());
}
new_node.id = old_id; // ensure id is stable
graph.update_node(new_node.clone()).await?;
}
// Apply added
for n in diff.added.iter().cloned() {
graph.add_node(n.clone()).await?;
txlog.added.push(n.id);
}
// Apply removed
for id in diff.removed.iter().cloned() {
if let Some(prev) = graph.get_node(id).await? {
txlog.removed_prev.push(prev.clone());
}
graph.remove_node(id).await?;
}
Ok(())
}
async fn rollback(&self, txlog: &UpdateTransactionLog) -> Result<()> {
let mut graph = self.graph.lock().await;
// 1) Re-add removed nodes
for n in txlog.removed_prev.iter().cloned() {
let _ = graph.add_node(n).await;
}
// 2) Revert updates
for n in txlog.updated_prev.iter().cloned() {
let _ = graph.update_node(n).await;
}
// 3) Remove added nodes
for id in txlog.added.iter().cloned() {
let _ = graph.remove_node(id).await;
}
Ok(())
}
// Cache invalidation helpers
pub fn invalidate_file(&self, file: &str) {
self.cache.invalidate_file(file);
}
pub fn invalidate_node(&self, id: NodeId) {
self.cache.invalidate_node(id);
}
pub fn clear_cache(&self) {
self.cache.clear();
}
}
// -----------------------------
// Tests
// -----------------------------
#[cfg(test)]
mod tests {
use super::*;
use crate::traits::VectorStore;
use crate::CodeGraphError;
use async_trait::async_trait;
use crossbeam_channel::{unbounded, Receiver, Sender};
use tokio_test::block_on;
struct InMemoryParser {
files: DashMap<String, Vec<CodeNode>>,
}
#[async_trait]
impl CodeParser for InMemoryParser {
async fn parse_file(&self, file_path: &str) -> Result<Vec<CodeNode>> {
Ok(self
.files
.get(file_path)
.map(|e| e.clone())
.unwrap_or_default())
}
fn supported_languages(&self) -> Vec<crate::Language> {
vec![Language::Rust]
}
}
struct InMemoryGraph {
nodes: DashMap<NodeId, CodeNode>,
}
#[async_trait]
impl GraphStore for InMemoryGraph {
async fn add_node(&mut self, node: CodeNode) -> Result<()> {
self.nodes.insert(node.id, node);
Ok(())
}
async fn get_node(&self, id: NodeId) -> Result<Option<CodeNode>> {
Ok(self.nodes.get(&id).map(|e| e.clone()))
}
async fn update_node(&mut self, node: CodeNode) -> Result<()> {
self.nodes.insert(node.id, node);
Ok(())
}
async fn remove_node(&mut self, id: NodeId) -> Result<()> {
self.nodes.remove(&id);
Ok(())
}
async fn find_nodes_by_name(&self, name: &str) -> Result<Vec<CodeNode>> {
Ok(self
.nodes
.iter()
.filter(|e| e.name.as_str() == name)
.map(|e| e.clone())
.collect())
}
}
struct InMemoryVectorStore {
embs: DashMap<NodeId, Vec<f32>>,
}
#[async_trait]
impl VectorStore for InMemoryVectorStore {
async fn store_embeddings(&mut self, nodes: &[CodeNode]) -> Result<()> {
for n in nodes {
if let Some(e) = &n.embedding {
self.embs.insert(n.id, e.clone());
}
}
Ok(())
}
async fn search_similar(
&self,
_query_embedding: &[f32],
_limit: usize,
) -> Result<Vec<NodeId>> {
Ok(vec![])
}
async fn get_embedding(&self, node_id: NodeId) -> Result<Option<Vec<f32>>> {
Ok(self.embs.get(&node_id).map(|e| e.clone()))
}
}
fn make_node(file: &str, name: &str, content: &str) -> CodeNode {
let now = chrono::Utc::now();
CodeNode {
id: NodeId::new_v4(),
name: name.into(),
node_type: Some(NodeType::Function),
language: Some(Language::Rust),
location: crate::Location {
file_path: file.into(),
line: 1,
column: 1,
end_line: Some(1),
end_column: Some(10),
},
content: Some(content.into()),
metadata: crate::Metadata {
attributes: Default::default(),
created_at: now,
updated_at: now,
},
embedding: None,
complexity: None,
}
}
#[tokio::test]
async fn detect_add_change_remove() {
let parser = Arc::new(InMemoryParser {
files: DashMap::new(),
});
let graph = Arc::new(Mutex::new(InMemoryGraph {
nodes: DashMap::new(),
}));
let vector = None;
let updater = IncrementalUpdater::new(parser.clone(), graph.clone(), vector);
// initial file with two functions
let f = "a.rs";
let a1 = make_node(f, "foo", "fn foo()->i32{1}");
let b1 = make_node(f, "bar", "fn bar()->i32{2}");
parser.files.insert(f.into(), vec![a1.clone(), b1.clone()]);
let r1 = updater.update_file(f).await.unwrap();
assert_eq!(r1.added, 2);
assert_eq!(r1.changed, 0);
assert_eq!(r1.removed, 0);
// modify foo, remove bar, add baz
let mut a2 = a1.clone();
a2.content = Some("fn foo()->i32{3}".into());
let c1 = make_node(f, "baz", "fn baz()->i32{4}");
parser.files.insert(f.into(), vec![a2.clone(), c1.clone()]);
let r2 = updater.update_file(f).await.unwrap();
assert_eq!(r2.added, 1); // baz
assert_eq!(r2.changed, 1); // foo
assert_eq!(r2.removed, 1); // bar
}
#[tokio::test]
async fn preserves_node_id_on_change() {
let parser = Arc::new(InMemoryParser {
files: DashMap::new(),
});
let graph = Arc::new(Mutex::new(InMemoryGraph {
nodes: DashMap::new(),
}));
let updater = IncrementalUpdater::new(parser.clone(), graph.clone(), None);
let f = "b.rs";
let a1 = make_node(f, "foo", "fn foo()->i32{1}");
parser.files.insert(f.into(), vec![a1.clone()]);
updater.update_file(f).await.unwrap();
// change content, keep identity
let mut a2 = a1.clone();
a2.content = Some("fn foo()->i32{42}".into());
parser.files.insert(f.into(), vec![a2.clone()]);
updater.update_file(f).await.unwrap();
let g = graph.lock().await;
let cur = g.get_node(a1.id).await.unwrap().unwrap();
assert_eq!(cur.id, a1.id);
assert_eq!(cur.content.as_ref().unwrap().as_str(), "fn foo()->i32{42}");
}
#[tokio::test]
async fn rollback_on_failure_reverts_changes() {
// Graph that fails on update to simulate an error
struct FailingGraph {
nodes: DashMap<NodeId, CodeNode>,
}
#[async_trait]
impl GraphStore for FailingGraph {
async fn add_node(&mut self, node: CodeNode) -> Result<()> {
self.nodes.insert(node.id, node);
Ok(())
}
async fn get_node(&self, id: NodeId) -> Result<Option<CodeNode>> {
Ok(self.nodes.get(&id).map(|e| e.clone()))
}
async fn update_node(&mut self, _node: CodeNode) -> Result<()> {
Err(CodeGraphError::Database("fail".into()))
}
async fn remove_node(&mut self, id: NodeId) -> Result<()> {
self.nodes.remove(&id);
Ok(())
}
async fn find_nodes_by_name(&self, _name: &str) -> Result<Vec<CodeNode>> {
Ok(vec![])
}
}
let parser = Arc::new(InMemoryParser {
files: DashMap::new(),
});
let graph = Arc::new(Mutex::new(FailingGraph {
nodes: DashMap::new(),
}));
let updater = IncrementalUpdater::new(parser.clone(), graph.clone(), None);
let f = "c.rs";
let a1 = make_node(f, "foo", "fn foo()->i32{1}");
parser.files.insert(f.into(), vec![a1.clone()]);
// First update adds node successfully
let _ = updater.update_file(f).await.unwrap();
// Second update triggers update failure and should rollback
let mut a2 = a1.clone();
a2.content = Some("fn foo()->i32{2}".into());
parser.files.insert(f.into(), vec![a2.clone()]);
let err = updater.update_file(f).await.err();
assert!(err.is_some());
// Node content should remain original
let g = graph.lock().await;
let cur = g.get_node(a1.id).await.unwrap().unwrap();
assert_eq!(cur.content.as_ref().unwrap().as_str(), "fn foo()->i32{1}");
}
#[tokio::test]
async fn cache_invalidation_works() {
let parser = Arc::new(InMemoryParser {
files: DashMap::new(),
});
let graph = Arc::new(Mutex::new(InMemoryGraph {
nodes: DashMap::new(),
}));
let updater =
IncrementalUpdater::new(parser.clone(), graph.clone(), None).with_cache_capacity(2);
let f1 = "d1.rs";
let f2 = "d2.rs";
let f3 = "d3.rs";
parser
.files
.insert(f1.into(), vec![make_node(f1, "a", "1")]);
parser
.files
.insert(f2.into(), vec![make_node(f2, "b", "2")]);
parser
.files
.insert(f3.into(), vec![make_node(f3, "c", "3")]);
updater.update_file(f1).await.unwrap();
updater.update_file(f2).await.unwrap();
updater.update_file(f3).await.unwrap();
// capacity 2 should evict at least one (the oldest)
let c = updater.cache();
let present =
c.get(f1).is_some() as i32 + c.get(f2).is_some() as i32 + c.get(f3).is_some() as i32;
assert!(present >= 2); // not strict but ensures eviction ran
}
#[tokio::test]
async fn vector_updates_only_for_changes() {
// Wire vector integrator with simple hasher embedder
let parser = Arc::new(InMemoryParser {
files: DashMap::new(),
});
let graph = Arc::new(Mutex::new(InMemoryGraph {
nodes: DashMap::new(),
}));
let vstore = InMemoryVectorStore {
embs: DashMap::new(),
};
let embedder = Arc::new(crate::integration::graph_vector::HasherEmbeddingService::new(64));
// Provide a graph instance for vector integrator; it won't be used for indexing path here.
let g_for_vec: Arc<dyn GraphStore> = Arc::new(InMemoryGraph {
nodes: DashMap::new(),
});
let integrator = Arc::new(GraphVectorIntegrator::new(
g_for_vec,
Box::new(vstore),
embedder,
));
let updater =
IncrementalUpdater::new(parser.clone(), graph.clone(), Some(integrator.clone()));
let f = "vec.rs";
let a1 = make_node(f, "foo", "fn foo(){1}");
parser.files.insert(f.into(), vec![a1.clone()]);
updater.update_file(f).await.unwrap();
let first_ct = integrator.signature_len();
// unchanged update should not add embeddings
updater.update_file(f).await.unwrap();
assert_eq!(first_ct, integrator.signature_len());
// change content → should update signature count unchanged but reembed
let mut a2 = a1.clone();
a2.content = Some("fn foo(){2}".into());
parser.files.insert(f.into(), vec![a2.clone()]);
updater.update_file(f).await.unwrap();
assert_eq!(integrator.signature_len(), first_ct);
}
}