Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
indexer.rs152 kB
// ABOUTME: Drives the indexing pipeline for the Codegraph MCP CLI. // ABOUTME: Coordinates parsing, embeddings, and persistence into SurrealDB. #![allow(dead_code, unused_variables, unused_imports)] use crate::estimation::{ extend_symbol_index, parse_files_with_unified_extraction as shared_unified_parse, }; use anyhow::{anyhow, Context, Result}; use codegraph_core::{CodeNode, EdgeRelationship, NodeId, NodeType}; use codegraph_graph::ChunkEmbeddingRecord; use codegraph_graph::{ edge::CodeEdge, FileMetadataRecord, NodeEmbeddingRecord, ProjectMetadataRecord, SurrealDbConfig, SurrealDbStorage, SymbolEmbeddingRecord, SURR_EMBEDDING_COLUMN_1024, SURR_EMBEDDING_COLUMN_1536, SURR_EMBEDDING_COLUMN_2048, SURR_EMBEDDING_COLUMN_2560, SURR_EMBEDDING_COLUMN_3072, SURR_EMBEDDING_COLUMN_384, SURR_EMBEDDING_COLUMN_4096, SURR_EMBEDDING_COLUMN_768, }; use codegraph_parser::TreeSitterParser; #[cfg(feature = "embeddings")] use codegraph_vector::prep::chunker::ChunkPlan; #[cfg(feature = "ai-enhanced")] use futures::{stream, StreamExt}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use num_cpus; use rayon::prelude::*; use regex::Regex; use rustc_demangle::try_demangle; use sha2::{Digest, Sha256}; use std::borrow::Cow; use std::collections::HashSet; use std::env; use std::ffi::OsStr; use std::fs; use std::io::Read; use std::path::{Path, PathBuf}; use std::time::Duration; use symbolic_demangle::demangle; use syn::{parse_str as parse_syn_path, Path as SynPath, PathArguments}; use tokio::fs as tokio_fs; use tokio::sync::{mpsc, oneshot, Mutex as TokioMutex}; use tokio::task::JoinHandle; use tracing::{debug, error, info, warn}; use url::Url; use walkdir::WalkDir; use std::sync::{Arc, Mutex}; use std::collections::HashMap; const SYMBOL_EMBEDDING_DB_BATCH_LIMIT: usize = 256; #[derive(Debug, Clone, PartialEq)] pub enum FileChangeType { Added, Modified, Deleted, Unchanged, } #[derive(Debug, Clone)] pub struct FileChange { pub file_path: String, pub change_type: FileChangeType, pub current_hash: Option<String>, pub previous_hash: Option<String>, } #[derive(Clone, Copy, Debug)] enum SurrealEmbeddingColumn { Embedding384, Embedding768, Embedding1024, Embedding1536, Embedding2048, Embedding2560, Embedding3072, Embedding4096, } impl SurrealEmbeddingColumn { fn column_name(&self) -> &'static str { match self { SurrealEmbeddingColumn::Embedding384 => SURR_EMBEDDING_COLUMN_384, SurrealEmbeddingColumn::Embedding768 => SURR_EMBEDDING_COLUMN_768, SurrealEmbeddingColumn::Embedding1024 => SURR_EMBEDDING_COLUMN_1024, SurrealEmbeddingColumn::Embedding1536 => SURR_EMBEDDING_COLUMN_1536, SurrealEmbeddingColumn::Embedding2048 => SURR_EMBEDDING_COLUMN_2048, SurrealEmbeddingColumn::Embedding2560 => SURR_EMBEDDING_COLUMN_2560, SurrealEmbeddingColumn::Embedding3072 => SURR_EMBEDDING_COLUMN_3072, SurrealEmbeddingColumn::Embedding4096 => SURR_EMBEDDING_COLUMN_4096, } } fn dimension(&self) -> usize { match self { SurrealEmbeddingColumn::Embedding384 => 384, SurrealEmbeddingColumn::Embedding768 => 768, SurrealEmbeddingColumn::Embedding1024 => 1024, SurrealEmbeddingColumn::Embedding1536 => 1536, SurrealEmbeddingColumn::Embedding2048 => 2048, SurrealEmbeddingColumn::Embedding2560 => 2560, SurrealEmbeddingColumn::Embedding3072 => 3072, SurrealEmbeddingColumn::Embedding4096 => 4096, } } } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct IndexerConfig { pub languages: Vec<String>, pub exclude_patterns: Vec<String>, pub include_patterns: Vec<String>, pub recursive: bool, pub force_reindex: bool, pub watch: bool, pub workers: usize, pub batch_size: usize, pub max_concurrent: usize, pub vector_dimension: usize, pub device: Option<String>, pub max_seq_len: usize, pub symbol_batch_size: Option<usize>, pub symbol_max_concurrent: Option<usize>, /// Root directory of the project being indexed (where .codegraph/ will be created) /// Defaults to current directory if not specified pub project_root: PathBuf, } impl Default for IndexerConfig { fn default() -> Self { Self { languages: vec![], exclude_patterns: vec![], include_patterns: vec![], recursive: true, force_reindex: false, watch: false, workers: 4, batch_size: 100, max_concurrent: 10, vector_dimension: 384, // Match EmbeddingGenerator default (all-MiniLM-L6-v2) device: None, max_seq_len: 512, symbol_batch_size: None, symbol_max_concurrent: None, project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")), } } } impl From<&IndexerConfig> for codegraph_parser::file_collect::FileCollectionConfig { fn from(config: &IndexerConfig) -> Self { codegraph_parser::file_collect::FileCollectionConfig { recursive: config.recursive, languages: config.languages.clone(), include_patterns: config.include_patterns.clone(), exclude_patterns: config.exclude_patterns.clone(), } } } pub struct ProjectIndexer { config: IndexerConfig, global_config: codegraph_core::config_manager::CodeGraphConfig, progress: MultiProgress, parser: TreeSitterParser, surreal: Arc<TokioMutex<SurrealDbStorage>>, surreal_writer: Option<SurrealWriterHandle>, project_id: String, organization_id: Option<String>, repository_url: Option<String>, domain: Option<String>, embedding_model: String, vector_dim: usize, embedding_column: SurrealEmbeddingColumn, project_root: PathBuf, #[cfg(feature = "embeddings")] embedder: codegraph_vector::EmbeddingGenerator, } enum SurrealWriteJob { Nodes(Vec<CodeNode>), Edges(Vec<CodeEdge>), NodeEmbeddings(Vec<NodeEmbeddingRecord>), SymbolEmbeddings(Vec<SymbolEmbeddingRecord>), ChunkEmbeddings(Vec<ChunkEmbeddingRecord>), FileMetadata(Vec<FileMetadataRecord>), DeleteNodesByFile { file_paths: Vec<String>, project_id: String, }, ProjectMetadata(ProjectMetadataRecord), Flush(oneshot::Sender<Result<()>>), Shutdown(oneshot::Sender<Result<()>>), } struct SurrealWriterHandle { tx: mpsc::Sender<SurrealWriteJob>, join: JoinHandle<()>, } impl SurrealWriterHandle { fn new(pool: Vec<Arc<TokioMutex<SurrealDbStorage>>>) -> Self { let (tx, mut rx) = mpsc::channel(8); let pool_arc = Arc::new(pool); let join = tokio::spawn(async move { let mut last_error: Option<anyhow::Error> = None; let mut rr: usize = 0; while let Some(job) = rx.recv().await { let pool_len = pool_arc.len().max(1); let storage = pool_arc[rr % pool_len].clone(); rr = rr.wrapping_add(1); match job { SurrealWriteJob::Nodes(nodes) => { if nodes.is_empty() { continue; } if let Err(err) = { let mut guard = storage.lock().await; guard.upsert_nodes_batch(&nodes).await } { error!("Surreal node batch failed: {}", err); last_error = Some(anyhow!(err.to_string())); } } SurrealWriteJob::Edges(edges) => { if edges.is_empty() { continue; } if let Err(err) = { let mut guard = storage.lock().await; guard.upsert_edges_batch(&edges).await } { error!("Surreal edge batch failed: {}", err); last_error = Some(anyhow!(err.to_string())); } } SurrealWriteJob::NodeEmbeddings(records) => { if records.is_empty() { continue; } if let Err(err) = { let guard = storage.lock().await; guard.update_node_embeddings_batch(&records).await } { error!("Surreal node embedding batch failed: {}", err); last_error = Some(anyhow!(err.to_string())); } } SurrealWriteJob::SymbolEmbeddings(records) => { if records.is_empty() { continue; } if let Err(err) = { let guard = storage.lock().await; guard.upsert_symbol_embeddings_batch(&records).await } { error!("Surreal symbol embedding batch failed: {}", err); last_error = Some(anyhow!(err.to_string())); } } SurrealWriteJob::ChunkEmbeddings(records) => { if records.is_empty() { continue; } let batch_size = records.len(); if let Err(err) = { let guard = storage.lock().await; guard.upsert_chunk_embeddings_resilient(&records).await } { error!( "🧩 Surreal chunk embedding batch failed ({} records): {}", batch_size, err ); if last_error.is_none() { last_error = Some(anyhow!(err.to_string())); } } else { debug!("🧩 Surreal chunk batch persisted: {} records", batch_size); } } SurrealWriteJob::FileMetadata(records) => { if records.is_empty() { continue; } if let Err(err) = { let guard = storage.lock().await; guard.upsert_file_metadata_batch(&records).await } { error!("Surreal file metadata batch failed: {}", err); last_error = Some(anyhow!(err.to_string())); } } SurrealWriteJob::DeleteNodesByFile { file_paths, project_id, } => { if file_paths.is_empty() { continue; } let guard = storage.lock().await; let delete_nodes_query = "DELETE nodes WHERE project_id = $project_id AND file_path IN $file_paths RETURN BEFORE"; let mut result = match guard .db() .query(delete_nodes_query) .bind(("project_id", project_id.clone())) .bind(("file_paths", file_paths.clone())) .await { Ok(res) => res, Err(e) => { error!("Failed to delete nodes for files {:?}: {}", file_paths, e); last_error = Some(anyhow!(e.to_string())); continue; } }; let deleted_nodes: Vec<HashMap<String, serde_json::Value>> = result.take(0).unwrap_or_default(); let node_ids: Vec<String> = deleted_nodes .iter() .filter_map(|n| { n.get("id").and_then(|v| v.as_str()).map(|s| s.to_string()) }) .collect(); if !node_ids.is_empty() { let delete_symbols = r#" LET $node_ids = $ids; DELETE symbol_embeddings WHERE string::split(string::trim(node_id), ':')[1] IN $node_ids; "#; if let Err(e) = guard .db() .query(delete_symbols) .bind(("ids", node_ids.clone())) .await { error!( "Failed to delete symbol_embeddings for files {:?}: {}", file_paths, e ); last_error = Some(anyhow!(e.to_string())); } } if !node_ids.is_empty() { let edge_query = r#" LET $node_ids = $ids; DELETE edges WHERE string::split(string::trim(from), ':')[1] IN $node_ids OR string::split(string::trim(to), ':')[1] IN $node_ids "#; if let Err(e) = guard .db() .query(edge_query) .bind(("ids", node_ids.clone())) .await { error!("Failed to delete edges for files {:?}: {}", file_paths, e); last_error = Some(anyhow!(e.to_string())); } // Delete chunks that reference the deleted nodes let chunks_query = r#" LET $node_ids = $ids; DELETE chunks WHERE string::split(string::trim(<string>parent_node), ':')[1] IN $node_ids "#; if let Err(e) = guard.db().query(chunks_query).bind(("ids", node_ids)).await { error!("Failed to delete chunks for files {:?}: {}", file_paths, e); last_error = Some(anyhow!(e.to_string())); } } if let Err(e) = guard .delete_file_metadata_for_files(&project_id, &file_paths) .await { error!( "Failed to delete file metadata for files {:?}: {}", file_paths, e ); last_error = Some(anyhow!(e.to_string())); } } SurrealWriteJob::ProjectMetadata(record) => { let result = { let guard = storage.lock().await; guard.upsert_project_metadata(record).await }; if let Err(err) = result { error!("Surreal project metadata write failed: {}", err); last_error = Some(anyhow!(err.to_string())); } } SurrealWriteJob::Flush(resp) => { let _ = resp.send(Self::current_error(&last_error)); } SurrealWriteJob::Shutdown(resp) => { let _ = resp.send(Self::current_error(&last_error)); break; } } } }); Self { tx, join } } fn current_error(error: &Option<anyhow::Error>) -> Result<()> { if let Some(err) = error { Err(anyhow!(err.to_string())) } else { Ok(()) } } async fn enqueue_nodes(&self, nodes: Vec<CodeNode>) -> Result<()> { if nodes.is_empty() { return Ok(()); } self.tx .send(SurrealWriteJob::Nodes(nodes)) .await .map_err(|e| anyhow!("Surreal writer unavailable: {}", e)) } async fn enqueue_edges(&self, edges: Vec<CodeEdge>) -> Result<()> { if edges.is_empty() { return Ok(()); } self.tx .send(SurrealWriteJob::Edges(edges)) .await .map_err(|e| anyhow!("Surreal writer unavailable: {}", e)) } async fn enqueue_node_embeddings(&self, records: Vec<NodeEmbeddingRecord>) -> Result<()> { if records.is_empty() { return Ok(()); } self.tx .send(SurrealWriteJob::NodeEmbeddings(records)) .await .map_err(|e| anyhow!("Surreal writer unavailable: {}", e)) } async fn enqueue_chunk_embeddings(&self, records: Vec<ChunkEmbeddingRecord>) -> Result<()> { if records.is_empty() { return Ok(()); } self.tx .send(SurrealWriteJob::ChunkEmbeddings(records)) .await .map_err(|e| anyhow!("Surreal writer unavailable: {}", e)) } async fn enqueue_symbol_embeddings(&self, records: Vec<SymbolEmbeddingRecord>) -> Result<()> { if records.is_empty() { return Ok(()); } self.tx .send(SurrealWriteJob::SymbolEmbeddings(records)) .await .map_err(|e| anyhow!("Surreal writer unavailable: {}", e)) } async fn enqueue_file_metadata(&self, records: Vec<FileMetadataRecord>) -> Result<()> { if records.is_empty() { return Ok(()); } self.tx .send(SurrealWriteJob::FileMetadata(records)) .await .map_err(|e| anyhow!("Surreal writer unavailable: {}", e)) } async fn enqueue_delete_nodes_by_file( &self, file_paths: Vec<String>, project_id: &str, ) -> Result<()> { if file_paths.is_empty() { return Ok(()); } self.tx .send(SurrealWriteJob::DeleteNodesByFile { file_paths, project_id: project_id.to_string(), }) .await .map_err(|e| anyhow!("Surreal writer unavailable: {}", e)) } async fn enqueue_project_metadata(&self, record: ProjectMetadataRecord) -> Result<()> { self.tx .send(SurrealWriteJob::ProjectMetadata(record)) .await .map_err(|e| anyhow!("Surreal writer unavailable: {}", e)) } async fn flush(&self) -> Result<()> { let (resp_tx, resp_rx) = oneshot::channel(); self.tx .send(SurrealWriteJob::Flush(resp_tx)) .await .map_err(|e| anyhow!("Surreal writer unavailable: {}", e))?; resp_rx .await .map_err(|_| anyhow!("Surreal writer task ended unexpectedly"))? } async fn shutdown(self) -> Result<()> { let (resp_tx, resp_rx) = oneshot::channel(); let _ = self.tx.send(SurrealWriteJob::Shutdown(resp_tx)).await; let result = resp_rx .await .unwrap_or_else(|_| Err(anyhow!("Surreal writer task ended unexpectedly"))); let _ = self.join.await; result } } impl ProjectIndexer { #[cfg(feature = "ai-enhanced")] fn symbol_embedding_batch_settings(&self) -> (usize, usize) { let config_batch = self .config .symbol_batch_size .unwrap_or(self.config.batch_size); let config_concurrent = self .config .symbol_max_concurrent .unwrap_or_else(|| self.config.max_concurrent.max(2)); let batch_size = std::env::var("CODEGRAPH_SYMBOL_BATCH_SIZE") .ok() .and_then(|v| v.parse::<usize>().ok()) .unwrap_or(config_batch) .clamp(1, 2048); let max_concurrent = std::env::var("CODEGRAPH_SYMBOL_MAX_CONCURRENT") .ok() .and_then(|v| v.parse::<usize>().ok()) .unwrap_or(config_concurrent) .clamp(1, 32); (batch_size, max_concurrent) } pub async fn new( mut config: IndexerConfig, global_config: &codegraph_core::config_manager::CodeGraphConfig, multi_progress: MultiProgress, ) -> Result<Self> { // Cap Rayon threads to a sensible default: leave one core free and obey user overrides let available = std::thread::available_parallelism() .map(|n| n.get()) .unwrap_or(4); let env_workers = std::env::var("CODEGRAPH_WORKERS") .ok() .and_then(|v| v.parse::<usize>().ok()); let requested = env_workers.unwrap_or(config.workers); let target_threads = requested .max(2) .min(rayon::max_num_threads().max(2)) .min(available.saturating_sub(1).max(2)); if std::env::var("RAYON_NUM_THREADS").is_err() { std::env::set_var("RAYON_NUM_THREADS", target_threads.to_string()); } let rayon_pool_built = rayon::ThreadPoolBuilder::new() .num_threads(target_threads) .build_global() .is_ok(); if rayon_pool_built { info!( "🔧 Rayon threads capped to {} (workers/env)", target_threads ); } else { warn!( "Rayon global pool already set elsewhere. Using local pool for chunking. Current RAYON_NUM_THREADS={:?}.", std::env::var("RAYON_NUM_THREADS").ok() ); } // Allow runtime override for embedding batch size if let Ok(val) = std::env::var("CODEGRAPH_EMBEDDINGS_BATCH_SIZE") { if let Ok(parsed) = val.parse::<usize>() { config.batch_size = parsed.clamp(1, 2048); } } let parser = TreeSitterParser::new(); let project_root = config.project_root.clone(); let (surreal, surreal_pool) = Self::connect_surreal_from_env().await?; let surreal_writer = SurrealWriterHandle::new(surreal_pool); let project_id = std::env::var("CODEGRAPH_PROJECT_ID") .unwrap_or_else(|_| project_root.display().to_string()); let organization_id = std::env::var("CODEGRAPH_ORGANIZATION_ID").ok(); let repository_url = std::env::var("CODEGRAPH_REPOSITORY_URL").ok(); let domain = std::env::var("CODEGRAPH_DOMAIN").ok(); #[cfg(feature = "embeddings")] let embedder = { use codegraph_vector::EmbeddingGenerator; // Use global config for embedding provider let provider = global_config.embedding.provider.to_lowercase(); if provider == "local" { #[cfg(feature = "embeddings-local")] { use codegraph_vector::embeddings::generator::{ AdvancedEmbeddingGenerator, EmbeddingEngineConfig, LocalDeviceTypeCompat, LocalEmbeddingConfigCompat, LocalPoolingCompat, }; let mut cfg = EmbeddingEngineConfig::default(); cfg.prefer_local_first = true; let device = match config .device .as_deref() .unwrap_or("") .to_lowercase() .as_str() { "metal" => LocalDeviceTypeCompat::Metal, d if d.starts_with("cuda:") => { let id = d.trim_start_matches("cuda:").parse::<usize>().unwrap_or(0); LocalDeviceTypeCompat::Cuda(id) } _ => LocalDeviceTypeCompat::Cpu, }; let model_name = global_config.embedding.model.clone().unwrap_or_else(|| { "sentence-transformers/all-MiniLM-L6-v2".to_string() }); cfg.local = Some(LocalEmbeddingConfigCompat { model_name, device, cache_dir: None, max_sequence_length: config.max_seq_len.max(32), pooling_strategy: LocalPoolingCompat::Mean, }); // Try to construct advanced engine; fall back to simple generator on error match AdvancedEmbeddingGenerator::new(cfg).await { Ok(engine) => { if !engine.has_provider() { return Err(anyhow::anyhow!( "Local embedding provider constructed without a backend. Ensure the model is BERT-compatible with safetensors and try --device metal or --device cpu" )); } let mut g = EmbeddingGenerator::default(); g.set_advanced_engine(std::sync::Arc::new(engine)); tracing::info!( target: "codegraph_mcp::indexer", "Active embeddings: Local (device: {}, max_seq_len: {}, batch_size: {})", config.device.as_deref().unwrap_or("cpu"), config.max_seq_len, config.batch_size ); g } Err(e) => { return Err(anyhow::anyhow!( "Failed to initialize local embedding provider: {}", e )); } } } #[cfg(not(feature = "embeddings-local"))] { tracing::warn!( target: "codegraph_mcp::indexer", "CODEGRAPH_EMBEDDING_PROVIDER=local requested but the 'embeddings-local' feature is not enabled; using auto provider" ); let g = EmbeddingGenerator::with_auto_from_env().await; // Set batch_size and max_concurrent for Jina provider if applicable #[cfg(feature = "embeddings-jina")] { g.set_jina_batch_size(config.batch_size); g.set_jina_max_concurrent(config.max_concurrent); } g } } else { let mut g = EmbeddingGenerator::with_config(global_config).await; // Set batch_size and max_concurrent for Jina provider if applicable #[cfg(feature = "embeddings-jina")] { g.set_jina_batch_size(config.batch_size); g.set_jina_max_concurrent(config.max_concurrent); } tracing::info!( target: "codegraph_mcp::indexer", "Active embeddings: {} (batch_size: {}, max_concurrent: {})", global_config.embedding.provider, config.batch_size, config.max_concurrent ); g } }; let embedding_model_name = global_config .embedding .model .clone() .unwrap_or_else(|| "jina-embeddings-v4".to_string()); let embedder_dimension = { #[cfg(feature = "embeddings")] { embedder.dimension() } #[cfg(not(feature = "embeddings"))] { config.vector_dimension } }; let env_vector_dim = std::env::var("CODEGRAPH_EMBEDDING_DIMENSION") .ok() .and_then(|v| v.parse::<usize>().ok()); let vector_dim = env_vector_dim.unwrap_or(embedder_dimension); let embedding_column = resolve_surreal_embedding_column(vector_dim).with_context(|| { format!( "Unsupported embedding dimension {}. Supported dimensions: 384, 768, 1024, 2048, 2560, 4096.", vector_dim ) })?; if let Some(v) = env_vector_dim { info!( "🧭 Embedding dimension override: CODEGRAPH_EMBEDDING_DIMENSION={} → Surreal column {}", v, embedding_column.column_name() ); } else { info!( "🧭 Embedding dimension resolved from provider ({}): {} → Surreal column {}", global_config.embedding.provider, vector_dim, embedding_column.column_name() ); } Ok(Self { config, global_config: global_config.clone(), progress: multi_progress, parser, surreal, surreal_writer: Some(surreal_writer), project_id, organization_id, repository_url, domain, embedding_model: embedding_model_name, vector_dim, embedding_column, project_root, #[cfg(feature = "embeddings")] embedder, }) } pub async fn index_project(&mut self, path: impl AsRef<Path>) -> Result<IndexStats> { let path = path.as_ref(); let start = std::time::Instant::now(); info!("Starting project indexing: {:?}", path); self.log_surrealdb_status("pre-parse"); let file_config: codegraph_parser::file_collect::FileCollectionConfig = (&self.config).into(); // FORCE REINDEX: Clean slate approach if self.config.force_reindex { info!("🧹 --force flag detected: Performing clean slate deletion"); let storage = self.surreal.lock().await; storage.clean_project_data(&self.project_id).await?; drop(storage); info!("✅ Clean slate complete, starting fresh index"); } // INCREMENTAL INDEXING: Check if already indexed and has file metadata let files_to_index = if self.is_indexed(path).await? && self.has_file_metadata().await? { info!("📊 Project already indexed, checking for file changes..."); // Collect current files (returns Vec<(PathBuf, u64)>) let files_with_sizes = codegraph_parser::file_collect::collect_source_files_with_config( path, &file_config, )?; // Extract just the paths for change detection let file_paths: Vec<PathBuf> = files_with_sizes.iter().map(|(p, _)| p.clone()).collect(); // Detect changes let changes = self.detect_file_changes(&file_paths).await?; // Categorize changes let added: Vec<_> = changes .iter() .filter(|c| matches!(c.change_type, FileChangeType::Added)) .collect(); let modified: Vec<_> = changes .iter() .filter(|c| matches!(c.change_type, FileChangeType::Modified)) .collect(); let deleted: Vec<_> = changes .iter() .filter(|c| matches!(c.change_type, FileChangeType::Deleted)) .collect(); let unchanged: Vec<_> = changes .iter() .filter(|c| matches!(c.change_type, FileChangeType::Unchanged)) .collect(); info!( "📈 Change summary: {} added, {} modified, {} deleted, {} unchanged", added.len(), modified.len(), deleted.len(), unchanged.len() ); // If no changes, skip indexing if added.is_empty() && modified.is_empty() && deleted.is_empty() { info!("✅ No changes detected, index is up to date"); let stats = IndexStats { skipped: unchanged.len(), ..IndexStats::default() }; self.shutdown_surreal_writer().await?; return Ok(stats); } // Handle deletions if !deleted.is_empty() { info!("🗑️ Removing data for {} deleted files", deleted.len()); let deleted_paths: Vec<String> = deleted.iter().map(|c| c.file_path.clone()).collect(); self.delete_data_for_files(&deleted_paths).await?; } // Collect files that need re-indexing (added + modified) with their sizes let files_to_reindex: Vec<(PathBuf, u64)> = added .iter() .chain(modified.iter()) .filter_map(|c| { let path = PathBuf::from(&c.file_path); // Find the file size from original collection files_with_sizes.iter().find(|(p, _)| p == &path).cloned() }) .collect(); if files_to_reindex.is_empty() { info!("✅ Only deletions processed, no files to index"); let stats = IndexStats { skipped: unchanged.len(), ..IndexStats::default() }; self.shutdown_surreal_writer().await?; return Ok(stats); } info!( "🔄 Incrementally indexing {} changed files (delete-then-insert semantics)", files_to_reindex.len() ); files_to_reindex } // Old index without metadata - fall back to full reindex else if self.is_indexed(path).await? { warn!("⚠️ Project indexed without file metadata. Use --force to reindex, or continuing with full index."); codegraph_parser::file_collect::collect_source_files_with_config(path, &file_config)? } // Fresh index - index all files else { codegraph_parser::file_collect::collect_source_files_with_config(path, &file_config)? }; // STAGE 1: File Collection & Parsing let files = files_to_index; let total_files = files.len(); // PRE-DELETE for modified files to avoid dupes/stale data let modified_paths: Vec<String> = files .iter() .map(|(p, _)| p.to_string_lossy().to_string()) .collect(); if !modified_paths.is_empty() && self.has_file_metadata().await.unwrap_or(false) { info!("🧹 Removing existing nodes/edges/file_metadata for changed files"); self.delete_data_for_files(&modified_paths).await?; } // Single progress bar for unified AST + fast_ml extraction let ast_pb = self.create_progress_bar( total_files as u64, "🌳 Parsing & extracting (TreeSitter + FastML)", ); // REVOLUTIONARY: Use unified extraction for nodes + edges in single pass (FASTEST approach) // Clone files for parsing (we need them again for metadata persistence) let (mut nodes, mut edges, pstats) = self .parse_files_with_unified_extraction(files.clone(), total_files as u64) .await?; ast_pb.finish_with_message(format!( "🌳 Parsed {} files | nodes: {} | edges: {}", pstats.parsed_files, nodes.len(), edges.len() )); // Generate deterministic IDs and build old_id -> new_id mapping to update edge references let mut id_mapping: std::collections::HashMap<NodeId, NodeId> = std::collections::HashMap::with_capacity(nodes.len()); for node in nodes.iter_mut() { let old_id = node.id; node.set_deterministic_id(&self.project_id); id_mapping.insert(old_id, node.id); self.annotate_node(node); } // Update edge.from references to use new deterministic IDs for edge in edges.iter_mut() { if let Some(new_id) = id_mapping.get(&edge.from) { edge.from = *new_id; } } // Store counts for final summary (before consumption) let total_nodes_extracted = nodes.len(); let total_edges_extracted = edges.len(); // Build chunk plan early so we can annotate nodes with chunk counts before persistence #[cfg(feature = "embeddings")] let chunk_plan: ChunkPlan = { let start = std::time::Instant::now(); let file_sources = self.load_file_sources(&files); let chunker = || { self.embedder .chunk_nodes_with_sources(&nodes, &file_sources) }; let plan = { let threads = std::env::var("RAYON_NUM_THREADS") .ok() .and_then(|v| v.parse::<usize>().ok()) .unwrap_or_else(|| self.config.workers.max(2)); let pool = rayon::ThreadPoolBuilder::new() .num_threads(threads) .build() .expect("failed to build local rayon pool for chunking"); pool.install(chunker) }; let elapsed = start.elapsed(); info!( "🧩 Chunk plan built: {} nodes → {} chunks in {:.1?}", plan.stats.total_nodes, plan.stats.total_chunks, elapsed ); if plan.stats.total_chunks == 0 && !nodes.is_empty() { warn!( "Chunking produced zero chunks. Check CODEGRAPH_EMBEDDING_SKIP_CHUNKING, embedding provider availability, and model max_tokens settings." ); } plan }; #[cfg(not(feature = "embeddings"))] { info!("⚠️ Embeddings feature disabled at compile time; chunking skipped"); let _chunk_plan: Option<()> = None; } #[cfg(feature = "embeddings")] { let mut node_chunk_counts: std::collections::HashMap<usize, usize> = std::collections::HashMap::new(); for meta in &chunk_plan.metas { *node_chunk_counts.entry(meta.node_index).or_insert(0) += 1; } for (idx, node) in nodes.iter_mut().enumerate() { let count = node_chunk_counts.get(&idx).cloned().unwrap_or(0); node.metadata .attributes .insert("chunk_count".to_string(), count.to_string()); } } let success_rate = if pstats.total_files > 0 { (pstats.parsed_files as f64 / pstats.total_files as f64) * 100.0 } else { 100.0 }; let parse_completion_msg = format!( "🌳 Unified fast_ml + AST extraction complete: {}/{} files (✅ {:.1}% success) | 📊 {} nodes + {} edges | ⚡ {:.0} lines/s", pstats.parsed_files, pstats.total_files, success_rate, total_nodes_extracted, total_edges_extracted, pstats.lines_per_second ); // Enhanced parsing statistics info!("🌳 TreeSitter AST parsing results:"); info!( " 📊 Semantic nodes extracted: {} (functions, structs, classes, etc.)", total_nodes_extracted ); info!( " 🔗 Code relationships extracted: {} (calls, imports, dependencies)", total_edges_extracted ); info!( " 📈 Extraction efficiency: {:.1} nodes/file | {:.1} edges/file", total_nodes_extracted as f64 / pstats.parsed_files.max(1) as f64, total_edges_extracted as f64 / pstats.parsed_files.max(1) as f64 ); info!( " 🎯 Sample nodes: {:?}", nodes.iter().take(3).map(|n| &n.name).collect::<Vec<_>>() ); self.log_surrealdb_status("post-parse"); if nodes.is_empty() { warn!("No nodes generated from parsing! Check parser implementation."); warn!( "Parsing stats: {} files, {} lines processed", pstats.parsed_files, pstats.total_lines ); } // STAGE 4: Persist nodes before embedding so SurrealDB reflects progress let store_nodes_pb = self.create_progress_bar(nodes.len() as u64, "📈 Storing nodes"); let mut stats = IndexStats { files: pstats.parsed_files, skipped: pstats.total_files - pstats.parsed_files, ..Default::default() }; let mut symbol_map: std::collections::HashMap<String, NodeId> = std::collections::HashMap::new(); for node in nodes.iter() { match node.node_type { Some(NodeType::Function) => stats.functions += 1, Some(NodeType::Class) => stats.classes += 1, Some(NodeType::Struct) => stats.structs += 1, Some(NodeType::Trait) => stats.traits += 1, _ => {} } if let Some(ref c) = node.content { stats.lines += c.lines().count(); } extend_symbol_index(&mut symbol_map, node); } let storage_batch = self.config.batch_size.max(1); for chunk in nodes.chunks(storage_batch) { self.persist_nodes_batch(chunk).await?; store_nodes_pb.inc(chunk.len() as u64); } self.flush_surreal_writer().await?; store_nodes_pb.finish_with_message("📈 Nodes stored"); self.log_surreal_node_count(total_nodes_extracted).await; #[cfg(feature = "embeddings")] let total_chunks = chunk_plan.chunks.len() as u64; #[cfg(not(feature = "embeddings"))] let total_chunks = 0u64; let embed_pb = self.create_batch_progress_bar( total_chunks, self.config.batch_size, "🧠 Embedding chunks (vector batch)", ); let chunk_store_pb = self.create_batch_progress_bar( total_chunks, self.config.batch_size, "🧩 Persisting chunk embeddings", ); let batch = self.config.batch_size.max(1); info!( " ⚙️ Effective embedding batch size: {} (CODEGRAPH_EMBEDDINGS_BATCH_SIZE clamped to 512 for DB writes)", batch ); #[allow(unused_mut)] #[cfg(feature = "embeddings")] let mut processed: u64; #[cfg(not(feature = "embeddings"))] let processed: u64 = 0; // Enhanced embedding phase logging let provider = &self.global_config.embedding.provider; info!("💾 Starting semantic embedding generation:"); info!( " 🤖 Provider: {} ({}-dimensional embeddings)", provider, self.vector_dim ); info!( " 🗄️ SurrealDB column: {}", self.embedding_column.column_name() ); let total_nodes = nodes.len() as u64; info!(" 📊 Nodes to embed: {} semantic entities", total_nodes); info!( " ⚡ Batch size: {} (optimized for {} system)", batch, self.estimate_system_memory() ); info!(" 🎯 Target: Enable similarity search and AI-powered analysis"); // Embed chunks and persist chunk embeddings #[cfg(feature = "embeddings")] { use futures::stream::{self, StreamExt}; use std::sync::atomic::{AtomicU64, Ordering}; // To avoid giant DB payloads, tie chunk grouping to the DB batch size (which may be lower // than the embedding batch size). This keeps both embedding and DB writes in smaller slices. // Also ensure we never exceed the embedding batch size, so the embedder isn’t overfed. let chunk_batch_size = chunk_embedding_db_batch_size(batch).min(batch); let chunk_batches: Vec<(Vec<String>, Vec<ChunkMeta>)> = { let mut batches = Vec::new(); let mut chunk_iter = chunk_plan.chunks.chunks(chunk_batch_size); let mut meta_iter = chunk_plan.metas.chunks(chunk_batch_size); while let (Some(chunk_batch), Some(meta_batch)) = (chunk_iter.next(), meta_iter.next()) { let texts: Vec<String> = chunk_batch.iter().map(|c| c.text.clone()).collect(); batches.push((texts, meta_batch.to_vec())); } batches }; let processed_atomic = Arc::new(AtomicU64::new(0)); let max_concurrent = self.config.max_concurrent.max(1); // DB batch size: prefer explicit env override, otherwise match embedding batch size let chunk_db_batch = chunk_embedding_db_batch_size(batch); let embedder = &self.embedder; let mut batch_stream = stream::iter(chunk_batches.into_iter().map(|(texts, metas)| async move { let embs = embedder.embed_texts_batched(&texts).await; (texts, metas, embs) })) .buffer_unordered(max_concurrent); while let Some((texts, metas, embs_result)) = batch_stream.next().await { let embs: Vec<Vec<f32>> = embs_result?; if embs.len() != metas.len() { warn!( "Chunk embedding batch size mismatch: got {} embeddings for {} metas", embs.len(), metas.len() ); } let mut records: Vec<ChunkEmbeddingRecord> = Vec::with_capacity(metas.len()); for ((meta, text), emb) in metas.iter().zip(texts.iter()).zip(embs.iter()) { if let Some(node) = nodes.get(meta.node_index) { if emb.len() != self.vector_dim { warn!( "Skipping chunk embedding: dimension mismatch (got {}, expected {}) for node {} chunk {}", emb.len(), self.vector_dim, node.id, meta.chunk_index ); continue; } records.push(ChunkEmbeddingRecord::new( &node.id.to_string(), meta.chunk_index, text.clone(), emb, &self.embedding_model, self.embedding_column.column_name(), &self.project_id, )); } } for batch in records.chunks(chunk_db_batch) { self.enqueue_chunk_embeddings(batch.to_vec()).await?; } let done = processed_atomic.fetch_add(metas.len() as u64, Ordering::Relaxed) + metas.len() as u64; embed_pb.set_position(done.min(total_chunks)); chunk_store_pb.set_position(done.min(total_chunks)); } processed = processed_atomic.load(Ordering::Relaxed); } #[cfg(not(feature = "embeddings"))] let processed: u64 = 0; let embedding_rate = if total_chunks > 0 { processed as f64 / total_chunks as f64 * 100.0 } else { 100.0 }; let provider = &self.global_config.embedding.provider; let embed_completion_msg = format!( "💾 Semantic embeddings complete: {}/{} chunks (✅ {:.1}% success) | 🤖 {} | 📐 {}-dim | 🚀 Batch: {}", processed, total_chunks, embedding_rate, provider, self.vector_dim, self.config.batch_size ); embed_pb.finish_with_message(embed_completion_msg); chunk_store_pb.finish_with_message(format!( "🧩 Chunk embeddings queued for persistence: {}/{} batches", total_chunks, total_chunks )); #[cfg(feature = "embeddings")] { // Ensure all chunk embeddings are flushed to SurrealDB before continuing self.flush_surreal_writer().await?; self.log_surreal_chunk_count(total_chunks as usize).await; } // Node embedding: keep pooled embedding as average of its chunks #[cfg(feature = "embeddings")] { // For now, skip storing node-level embedding when chunking is enabled. // Retrieval should use chunk embeddings directly. } stats.embeddings = processed as usize; // Enhanced embedding completion statistics info!("💾 Semantic embedding generation results:"); info!( " 🎯 Vector search enabled: {} nodes embedded for similarity matching", processed ); info!(" 📐 Embedding dimensions: {}", self.vector_dim); info!( " 🤖 Provider performance: {} with batch optimization", provider ); info!(" 🔍 Capabilities unlocked: Vector search, semantic analysis, AI-powered tools"); // CRITICAL FIX: Preserve working ONNX embedding session for AI semantic matching // Original reset caused fresh embedder creation to fail with ONNX resource conflicts, // falling back to random hash embeddings (0% AI effectiveness). // Keeping the working ONNX session ensures real embeddings for AI semantic matching. // Tradeoff: Slightly more memory usage during post-processing (acceptable on M4 Max). #[cfg(feature = "embeddings")] { // self.embedder = codegraph_vector::EmbeddingGenerator::default(); tracing::info!("🔧 Preserving working ONNX embedder session for AI semantic matching"); } tracing::info!( target: "codegraph_mcp::indexer", "Vector indexing handled by SurrealDB; local FAISS generation removed" ); // REVOLUTIONARY: Store edges extracted during unified parsing (MAXIMUM SPEED) let stored_edges; let edge_count = edges.len(); let resolution_rate; { let edge_pb = self.create_progress_bar(edges.len() as u64, "🔗 Resolving & Storing Dependencies"); let edge_count = edges.len(); info!("🔗 Starting dependency relationship storage:"); info!( " 📊 Raw relationships extracted: {} (calls, imports, dependencies)", edge_count ); info!( " 🎯 Symbol resolution map: {} unique symbols available", symbol_map.len() ); info!( " 🧠 AI-enhanced resolution: {} feature active", if cfg!(feature = "ai-enhanced") { "Semantic similarity" } else { "Pattern matching only" } ); info!(" 🔍 Resolution methods: Exact match → Simple name → Case variants → AI similarity"); info!(" 🚀 M4 Max optimization: Parallel processing with bulk database operations"); // REVOLUTIONARY: Parallel symbol resolution optimized for M4 Max 128GB let chunk_size = (edges.len() / 12).max(100).min(1000); // Optimal for 12+ cores let chunks: Vec<_> = edges.chunks(chunk_size).collect(); let total_chunks = chunks.len(); info!( "⚡ Parallel processing: {} edge chunks across {} cores", total_chunks, num_cpus::get() ); // REVOLUTIONARY: Pre-generate AI embeddings for BOTH known symbols AND unresolved edge targets #[cfg(feature = "ai-enhanced")] let (symbol_embeddings, unresolved_embeddings) = { info!("🚀 INITIALIZING REVOLUTIONARY 2-PHASE AI SEMANTIC MATCHING"); info!( "🔧 Phase 1: Pre-computing embeddings for {} known symbols", symbol_map.len() ); // Phase 1: Known symbol embeddings let known_embeddings = match self.precompute_symbol_embeddings(&symbol_map).await { embeddings if !embeddings.is_empty() => { info!( "✅ Known symbol embeddings ready: {} pre-computed", embeddings.len() ); embeddings } _ => { warn!( "⚠️ Known symbol embedding failed - falling back to empty embeddings" ); std::collections::HashMap::new() } }; // Phase 2: Pre-compute embeddings for ALL unresolved edge targets info!("🔧 Phase 2: Pre-computing embeddings for unresolved edge targets"); let mut unresolved_symbols: std::collections::HashSet<String> = std::collections::HashSet::new(); let mut unresolved_symbol_edge_ids: std::collections::HashMap<String, uuid::Uuid> = std::collections::HashMap::new(); for edge in edges.iter() { if !symbol_map.contains_key(&edge.to) { unresolved_symbols.insert(edge.to.clone()); unresolved_symbol_edge_ids .entry(edge.to.clone()) .or_insert_with(uuid::Uuid::new_v4); } } info!( "📊 Discovered {} unique unresolved symbols for AI embedding", unresolved_symbols.len() ); let unresolved_embeddings = if !unresolved_symbols.is_empty() { // PROFESSIONAL: Direct embedding generation for unresolved symbols (no fake NodeIds needed) match self .precompute_unresolved_symbol_embeddings( &unresolved_symbols, &unresolved_symbol_edge_ids, ) .await { embeddings if !embeddings.is_empty() => { info!( "✅ Unresolved symbol embeddings ready: {} pre-computed", embeddings.len() ); embeddings } _ => { warn!("⚠️ Unresolved symbol embedding failed - AI matching will be limited"); std::collections::HashMap::new() } } } else { std::collections::HashMap::new() }; info!( "🤖 REVOLUTIONARY AI READY: {} known + {} unresolved = {} total embeddings", known_embeddings.len(), unresolved_embeddings.len(), known_embeddings.len() + unresolved_embeddings.len() ); (known_embeddings, unresolved_embeddings) }; #[cfg(not(feature = "ai-enhanced"))] let (symbol_embeddings, unresolved_embeddings): ( std::collections::HashMap<String, Vec<f32>>, std::collections::HashMap<String, Vec<f32>>, ) = { info!("🚀 Pattern-only resolution: AI semantic matching disabled (ai-enhanced feature not enabled)"); ( std::collections::HashMap::new(), std::collections::HashMap::new(), ) }; // Flush symbol embeddings to database before edge resolution self.flush_surreal_writer().await?; let mut unresolved_edges = 0; let mut exact_matches = 0; let mut pattern_matches = 0; #[cfg(feature = "ai-enhanced")] let mut ai_matches = 0; let resolution_start = std::time::Instant::now(); // REVOLUTIONARY: Parallel symbol resolution for M4 Max performance use std::sync::atomic::{AtomicUsize, Ordering}; let processed_chunks = AtomicUsize::new(0); let total_resolved = AtomicUsize::new(0); // Process all chunks in parallel using M4 Max cores let unresolved_samples: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new())); let chunk_results: Vec<_> = chunks .par_iter() .enumerate() .map(|(chunk_idx, chunk)| { let unresolved_samples = unresolved_samples.clone(); let mut chunk_resolved = Vec::new(); let mut chunk_stats = (0, 0, 0, 0); // (exact, pattern, ai, unresolved) for edge_rel in chunk.iter() { let mut target_id = None; let mut resolution_type = "unresolved"; // Rust-first normalization for variant in Self::normalize_rust_symbol(&edge_rel.to) { if let Some(&id) = symbol_map.get(&variant) { target_id = Some(id); resolution_type = "normalized"; break; } } // Python normalization (fallback to string-based) if target_id.is_none() { for variant in Self::normalize_python_symbol(&edge_rel.to) { if let Some(&id) = symbol_map.get(&variant) { target_id = Some(id); resolution_type = "normalized"; break; } } } if target_id.is_none() { for variant in Self::normalize_symbol_target(&edge_rel.to) { if let Some(&id) = symbol_map.get(&variant) { target_id = Some(id); resolution_type = "normalized"; break; } } } if target_id.is_none() { if let Some(simple_name) = edge_rel.to.split("::").last() { if let Some(&id) = symbol_map.get(simple_name) { target_id = Some(id); resolution_type = "simple_name"; } } } if let Some(target_id) = target_id { // Track resolution method for statistics match resolution_type { "normalized" | "exact" => chunk_stats.0 += 1, "simple_name" => chunk_stats.1 += 1, _ => {} } // Collect resolved edge for bulk storage chunk_resolved.push(( edge_rel.from, target_id, edge_rel.edge_type.clone(), edge_rel.metadata.clone(), )); } else { // REVOLUTIONARY: Real AI semantic matching using BOTH known + unresolved embeddings #[cfg(feature = "ai-enhanced")] { if let Some(best_match) = Self::ai_semantic_match_sync( &edge_rel.to, &symbol_map, &symbol_embeddings, &unresolved_embeddings, ) { chunk_stats.2 += 1; // AI match count chunk_resolved.push(( edge_rel.from, best_match, edge_rel.edge_type.clone(), edge_rel.metadata.clone(), )); } else { chunk_stats.3 += 1; // Unresolved count } } #[cfg(not(feature = "ai-enhanced"))] { chunk_stats.3 += 1; // Unresolved count } } } // Enhanced progress tracking with ETA for M4 Max visibility let chunks_done = processed_chunks.fetch_add(1, Ordering::Relaxed) + 1; if chunks_done % 3 == 0 || chunks_done == total_chunks { let resolved_so_far = total_resolved.fetch_add(chunk_resolved.len(), Ordering::Relaxed); edge_pb.set_position(resolved_so_far as u64); if chunks_done % 5 == 0 { let elapsed = resolution_start.elapsed().as_secs_f64(); let rate = resolved_so_far as f64 / elapsed; let remaining = edge_count - resolved_so_far; let eta = if rate > 0.0 { remaining as f64 / rate } else { 0.0 }; info!( "⚡ M4 Max parallel: {}/{} chunks | {} edges/s | ETA: {:.1}s", chunks_done, total_chunks, rate as usize, eta ); } } (chunk_resolved, chunk_stats) }) .collect(); // Aggregate statistics and resolved edges let mut all_resolved_edges = Vec::new(); for (chunk_edges, (exact, pattern, ai, unresolved)) in chunk_results { exact_matches += exact; pattern_matches += pattern; #[cfg(feature = "ai-enhanced")] { ai_matches += ai; } unresolved_edges += unresolved; all_resolved_edges.extend(chunk_edges); } let mut stored_edges_local = 0usize; let mut resolution_rate_local = 0.0; // Store resolved edges via writer if !all_resolved_edges.is_empty() { let serializable_edges: Vec<_> = all_resolved_edges .iter() .map( |(from, to, edge_type, metadata)| codegraph_graph::edge::CodeEdge { id: uuid::Uuid::new_v4(), from: *from, to: *to, edge_type: edge_type.clone(), weight: 1.0, metadata: metadata.clone(), }, ) .collect(); stored_edges_local = serializable_edges.len(); if let Err(err) = self.enqueue_edges(serializable_edges).await { warn!("⚠️ Failed to store resolved edges: {}", err); } let resolution_time = resolution_start.elapsed(); resolution_rate_local = (stored_edges_local as f64 / edge_count as f64) * 100.0; let edge_msg = format!( "🔗 Dependencies resolved: {}/{} relationships ({:.1}% success) | ⚡ {:.1}s", stored_edges_local, edge_count, resolution_rate_local, resolution_time.as_secs_f64() ); edge_pb.finish_with_message(edge_msg); info!("🔗 M4 MAX PARALLEL PROCESSING RESULTS:"); info!( " ✅ Successfully stored: {} edges ({:.1}% of extracted relationships)", stored_edges_local, resolution_rate_local ); info!( " 🎯 Exact matches: {} (direct symbol found)", exact_matches ); info!( " 🔄 Pattern matches: {} (simplified/cleaned symbols)", pattern_matches ); #[cfg(feature = "ai-enhanced")] info!( " 🧠 AI semantic matches: {} (similarity-based resolution)", ai_matches ); info!( " ❌ Unresolved: {} (external dependencies/dynamic calls)", unresolved_edges ); if let Ok(samples) = unresolved_samples.lock() { if !samples.is_empty() && std::env::var("CODEGRAPH_DEBUG").is_ok() { info!(" 🔍 Sample unresolved targets: {:?}", *samples); } } info!( " ⚡ M4 Max performance: {:.0} edges/s ({} cores utilized)", edge_count as f64 / resolution_time.as_secs_f64(), num_cpus::get() ); info!( " 🚀 Parallel efficiency: {} chunks processed across {} cores", total_chunks, num_cpus::get() ); if resolution_rate_local >= 80.0 { info!( "🎉 EXCELLENT: {:.1}% resolution rate achieved!", resolution_rate_local ); } else if resolution_rate_local >= 60.0 { info!( "👍 GOOD: {:.1}% resolution rate. Consider enabling ai-enhanced or improving symbol normalization.", resolution_rate_local ); } else { warn!( "⚠️ LOW resolution rate: {:.1}%. Check normalization and embeddings.", resolution_rate_local ); } } else { edge_pb.finish_with_message("No resolved edges to store"); } // Assign values for use outside the block stored_edges = stored_edges_local; resolution_rate = resolution_rate_local; } // ELIMINATED: No separate edge processing phase needed - edges extracted during parsing! self.log_surreal_edge_count(stored_edges).await; // Persist project metadata summary into SurrealDB self.persist_project_metadata(&stats, total_nodes_extracted, total_edges_extracted) .await?; self.flush_surreal_writer().await?; // Task 3.3: Update file metadata for incremental indexing info!("💾 Updating file metadata for change tracking"); let file_paths_only: Vec<PathBuf> = files.iter().map(|(p, _)| p.clone()).collect(); self.persist_file_metadata(&file_paths_only, &nodes, &edges) .await?; self.flush_surreal_writer().await?; self.verify_file_metadata_count(file_paths_only.len()) .await?; self.verify_project_metadata_present().await?; // COMPREHENSIVE INDEXING COMPLETION SUMMARY let avg_nodes_per_file = if stats.files > 0 { total_nodes_extracted as f64 / stats.files as f64 } else { 0.0 }; let avg_edges_per_file = if stats.files > 0 { total_edges_extracted as f64 / stats.files as f64 } else { 0.0 }; let avg_embeddings_per_node = if total_nodes_extracted > 0 { stats.embeddings as f64 / total_nodes_extracted as f64 } else { 0.0 }; let total_elapsed = start.elapsed().as_secs_f64(); info!("🎉 INDEXING COMPLETE"); info!( "📂 Files {} ({} skipped) | Lines {} | Time {:.1}s", pstats.parsed_files, stats.skipped, stats.lines, total_elapsed ); info!( "🌳 Graph coverage: nodes {} | edges {} | nodes/file {:.1} | edges/file {:.1} | resolved {:.1}%", total_nodes_extracted, stored_edges, avg_nodes_per_file, avg_edges_per_file, resolution_rate ); info!( "🧠 Embeddings: chunks {} | dim {} | provider {} | per-node {:.1}", stats.embeddings, self.vector_dim, provider, avg_embeddings_per_node ); info!( "⚡ Throughput: {:.1} files/s | {:.1} nodes/s | {:.1} edges/s", pstats.parsed_files as f64 / total_elapsed.max(1e-3), total_nodes_extracted as f64 / total_elapsed.max(1e-3), stored_edges as f64 / total_elapsed.max(1e-3) ); // Populate extended stats for CLI reporting stats.nodes = total_nodes_extracted; stats.edges = stored_edges; stats.chunks = stats.embeddings; // chunks == embeddings in current impl stats.embedding_dimension = self.vector_dim; stats.embedding_provider = provider.clone(); self.shutdown_surreal_writer().await?; Ok(stats) } /// REVOLUTIONARY: Parse files with unified node+edge extraction for maximum speed async fn parse_files_with_unified_extraction( &self, files: Vec<(PathBuf, u64)>, total_files: u64, ) -> Result<( Vec<CodeNode>, Vec<codegraph_core::EdgeRelationship>, codegraph_parser::ParsingStatistics, )> { shared_unified_parse(&self.parser, files, total_files).await } /// Estimate available system memory for informative logging fn estimate_system_memory(&self) -> String { #[cfg(target_os = "macos")] { if let Ok(output) = std::process::Command::new("sysctl") .args(["-n", "hw.memsize"]) .output() { if let Ok(memsize_str) = String::from_utf8(output.stdout) { if let Ok(memsize) = memsize_str.trim().parse::<u64>() { let gb = memsize / 1024 / 1024 / 1024; return format!("{}GB", gb); } } } } #[cfg(target_os = "linux")] { if let Ok(contents) = std::fs::read_to_string("/proc/meminfo") { if let Some(line) = contents.lines().find(|line| line.starts_with("MemTotal:")) { if let Some(kb_str) = line.split_whitespace().nth(1) { if let Ok(kb) = kb_str.parse::<u64>() { let gb = kb / 1024 / 1024; return format!("{}GB", gb); } } } } } "Unknown".to_string() } /// Pre-compute embeddings for all symbols for M4 Max performance optimization #[cfg(feature = "ai-enhanced")] async fn precompute_symbol_embeddings( &self, symbol_map: &std::collections::HashMap<String, NodeId>, ) -> std::collections::HashMap<String, Vec<f32>> { info!("🧠 Pre-computing symbol embeddings for M4 Max AI optimization"); info!( "🔧 DEBUG: precompute_symbol_embeddings called with {} symbols", symbol_map.len() ); let mut embeddings = std::collections::HashMap::new(); // Early validation if symbol_map.is_empty() { warn!("⚠️ Empty symbol map - skipping AI embedding pre-computation"); return embeddings; } // Get ALL symbols for maximum AI resolution coverage (M4 Max can handle it) let top_symbols: Vec<_> = symbol_map.keys().cloned().collect(); info!( "📊 Selected {} top symbols for AI embedding pre-computation", top_symbols.len() ); // ARCHITECTURAL IMPROVEMENT: Use existing working embedder instead of creating fresh one // This avoids re-initialization issues that could cause random hash fallback info!( "🤖 Using configured embedder ({}) for AI semantic matching", self.global_config.embedding.provider ); let embedder = &self.embedder; info!("✅ Using working embedder session (guaranteed real embeddings)"); let (batch_size, max_concurrent) = self.symbol_embedding_batch_settings(); let total_batches = (top_symbols.len() + batch_size - 1) / batch_size; info!( "⚡ Embedding batch size: {} symbols ({} batches, max {} concurrent)", batch_size, total_batches.max(1), max_concurrent ); // Create progress bar for symbol embedding generation let symbol_pb = self.create_batch_progress_bar( top_symbols.len() as u64, batch_size, "🧩 Symbol embeddings", ); let batches: Vec<Vec<String>> = top_symbols .chunks(batch_size) .map(|chunk| chunk.iter().cloned().collect()) .collect(); let mut processed = 0usize; let mut batch_stream = stream::iter(batches.into_iter().map(|batch| { let embedder = embedder; async move { let result = embedder.embed_texts_batched(&batch).await; (batch, result) } })) .buffer_unordered(max_concurrent); while let Some((batch, result)) = batch_stream.next().await { match result { Ok(batch_embeddings) => { let mut records = Vec::with_capacity(batch.len()); for (symbol, embedding) in batch.iter().cloned().zip(batch_embeddings.into_iter()) { records.push(self.build_symbol_embedding_record( &symbol, symbol_map.get(&symbol).cloned(), None, &embedding, )); embeddings.insert(symbol, embedding); processed += 1; } if let Err(err) = self.persist_symbol_embedding_records(records).await { warn!("⚠️ Failed to persist batch symbol embeddings: {}", err); } symbol_pb.set_position(processed as u64); } Err(e) => { warn!( "⚠️ Batch embedding failed for {} symbols: {}. Falling back to individual processing.", batch.len(), e ); for symbol in batch.into_iter() { match embedder.generate_text_embedding(&symbol).await { Ok(embedding) => { let record = self.build_symbol_embedding_record( &symbol, symbol_map.get(&symbol).cloned(), None, &embedding, ); if let Err(err) = self.persist_symbol_embedding_records(vec![record]).await { warn!( "⚠️ Failed to persist symbol embedding for '{}': {}", symbol, err ); } embeddings.insert(symbol, embedding); processed += 1; symbol_pb.set_position(processed as u64); } Err(err) => { warn!( "⚠️ Failed to generate embedding for symbol '{}': {}", symbol, err ); } } } } } } // Finish progress bar with summary let provider = &self.global_config.embedding.provider; let success_rate = if top_symbols.len() > 0 { embeddings.len() as f64 / top_symbols.len() as f64 * 100.0 } else { 100.0 }; let completion_msg = format!( "🧠 Symbol embeddings complete: {}/{} symbols (✅ {:.1}% success) | 🤖 {} | 🔗 AI semantic matching ready", embeddings.len(), top_symbols.len(), success_rate, provider ); symbol_pb.finish_with_message(completion_msg); info!( "🧠 Pre-computed {} symbol embeddings for fast AI resolution", embeddings.len() ); if embeddings.is_empty() { warn!("⚠️ No symbol embeddings were generated - AI matching will be disabled"); warn!( "🔍 Debug: top_symbols.len()={}, batches attempted={}", top_symbols.len(), (top_symbols.len() + batch_size - 1) / batch_size ); } else { info!( "✅ AI semantic matching ready with {:.1}% coverage ({}/{})", embeddings.len() as f64 / symbol_map.len() as f64 * 100.0, embeddings.len(), symbol_map.len() ); info!( "🤖 AI SEMANTIC MATCHING ACTIVATED: First call with {} pre-computed embeddings", embeddings.len() ); } embeddings } /// REVOLUTIONARY: Pre-compute embeddings directly for unresolved symbols (professional batching) #[cfg(feature = "ai-enhanced")] async fn precompute_unresolved_symbol_embeddings( &self, unresolved_symbols: &std::collections::HashSet<String>, symbol_edge_ids: &std::collections::HashMap<String, uuid::Uuid>, ) -> std::collections::HashMap<String, Vec<f32>> { use codegraph_vector::EmbeddingGenerator; info!("🧠 Pre-computing unresolved symbol embeddings for professional-grade AI"); info!( "🔧 Processing {} unique unresolved symbols", unresolved_symbols.len() ); let mut embeddings = std::collections::HashMap::new(); if unresolved_symbols.is_empty() { return embeddings; } let symbols_vec: Vec<_> = unresolved_symbols.iter().cloned().collect(); let embedder = &self.embedder; let (batch_size, max_concurrent) = self.symbol_embedding_batch_settings(); let total_batches = (symbols_vec.len() + batch_size - 1) / batch_size; info!( "⚡ Unresolved embedding batch size: {} symbols ({} batches, max {} concurrent)", batch_size, total_batches.max(1), max_concurrent ); // Create progress bar for unresolved symbol embedding generation let unresolved_pb = self.create_batch_progress_bar( symbols_vec.len() as u64, batch_size, "🧩 Unresolved symbol embeddings", ); let batches: Vec<Vec<String>> = symbols_vec .chunks(batch_size) .map(|chunk| chunk.iter().cloned().collect()) .collect(); let mut batch_stream = stream::iter(batches.into_iter().map(|batch| { let embedder = embedder; async move { let result = embedder.embed_texts_batched(&batch).await; (batch, result) } })) .buffer_unordered(max_concurrent); while let Some((batch, result)) = batch_stream.next().await { match result { Ok(batch_embeddings) => { let mut records = Vec::with_capacity(batch.len()); for (symbol, embedding) in batch.iter().cloned().zip(batch_embeddings.into_iter()) { let edge_id_ref = symbol_edge_ids.get(&symbol).map(|id| id.to_string()); let edge_id_ref = edge_id_ref.as_deref(); records.push(self.build_symbol_embedding_record( &symbol, None, edge_id_ref, &embedding, )); embeddings.insert(symbol, embedding); } if let Err(err) = self.persist_symbol_embedding_records(records).await { warn!( "⚠️ Failed to persist unresolved symbol embeddings batch: {}", err ); } unresolved_pb.set_position(embeddings.len() as u64); } Err(e) => { warn!( "⚠️ Batch embedding failed for {} unresolved symbols: {}. Falling back to individual processing.", batch.len(), e ); for symbol in batch.into_iter() { match embedder.generate_text_embedding(&symbol).await { Ok(embedding) => { let edge_id_ref = symbol_edge_ids.get(&symbol).map(|id| id.to_string()); let edge_id_ref = edge_id_ref.as_deref(); let record = self.build_symbol_embedding_record( &symbol, None, edge_id_ref, &embedding, ); if let Err(err) = self.persist_symbol_embedding_records(vec![record]).await { warn!( "⚠️ Failed to persist unresolved symbol embedding '{}': {}", symbol, err ); } embeddings.insert(symbol, embedding); unresolved_pb.set_position(embeddings.len() as u64); } Err(err) => { warn!( "⚠️ Failed to generate embedding for unresolved symbol '{}': {}", symbol, err ); } } } } } } // Finish progress bar with summary let provider = &self.global_config.embedding.provider; let success_rate = if symbols_vec.len() > 0 { embeddings.len() as f64 / symbols_vec.len() as f64 * 100.0 } else { 100.0 }; let completion_msg = format!( "🔗 Unresolved symbol embeddings complete: {}/{} symbols (✅ {:.1}% success) | 🤖 {} | ⚡ AI matching enhanced", embeddings.len(), symbols_vec.len(), success_rate, provider ); unresolved_pb.finish_with_message(completion_msg); info!( "🧠 Pre-computed {} unresolved symbol embeddings for professional AI matching", embeddings.len() ); if embeddings.is_empty() { warn!( "⚠️ No unresolved symbol embeddings were generated - AI matching will be limited" ); } else { info!("✅ Professional AI semantic matching ready with {:.1}% unresolved coverage ({}/{})", embeddings.len() as f64 / unresolved_symbols.len() as f64 * 100.0, embeddings.len(), unresolved_symbols.len()); } embeddings } /// REVOLUTIONARY: AI-powered symbol resolution using semantic similarity #[cfg(feature = "ai-enhanced")] async fn ai_resolve_symbol( &self, target_symbol: &str, symbol_map: &std::collections::HashMap<String, NodeId>, ) -> Option<NodeId> { use codegraph_vector::{search::SemanticSearch, EmbeddingGenerator}; use std::sync::Arc; // Use same config as main indexing for consistency let embedder = EmbeddingGenerator::with_config(&self.global_config).await; if let Ok(target_embedding) = embedder.generate_text_embedding(target_symbol).await { // Find the most similar symbol in our symbol map using cosine similarity let mut best_match: Option<(NodeId, f32)> = None; for (symbol_name, &node_id) in symbol_map.iter() { if let Ok(symbol_embedding) = embedder.generate_text_embedding(symbol_name).await { let similarity = self.cosine_similarity(&target_embedding, &symbol_embedding); // Use a threshold for semantic similarity (0.7 = quite similar) if similarity > 0.7 { if let Some((_, best_score)) = best_match { if similarity > best_score { best_match = Some((node_id, similarity)); } } else { best_match = Some((node_id, similarity)); } } } } if let Some((node_id, score)) = best_match { info!( "AI resolved '{}' with {:.1}% confidence", target_symbol, score * 100.0 ); return Some(node_id); } } None } /// Calculate cosine similarity between two embeddings #[cfg(feature = "ai-enhanced")] fn cosine_similarity(&self, a: &[f32], b: &[f32]) -> f32 { if a.len() != b.len() { return 0.0; } let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum(); let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt(); let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt(); if norm_a == 0.0 || norm_b == 0.0 { 0.0 } else { dot_product / (norm_a * norm_b) } } /// REVOLUTIONARY: AI semantic matching with hybrid fuzzy + real AI embeddings (batched) #[cfg(feature = "ai-enhanced")] fn ai_semantic_match_sync( target_symbol: &str, symbol_map: &std::collections::HashMap<String, NodeId>, symbol_embeddings: &std::collections::HashMap<String, Vec<f32>>, unresolved_embeddings: &std::collections::HashMap<String, Vec<f32>>, ) -> Option<NodeId> { // DIAGNOSTIC: Track AI matching usage static AI_MATCH_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0); let call_count = AI_MATCH_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if call_count == 0 { info!( "🤖 AI SEMANTIC MATCHING ACTIVATED: First call with {} pre-computed embeddings", symbol_embeddings.len() ); } if symbol_embeddings.is_empty() { if call_count < 3 { // Log first few failures warn!( "❌ AI MATCH SKIPPED: No pre-computed embeddings available for '{}'", target_symbol ); } return None; } if call_count < 5 { info!( "🔍 Attempting HYBRID AI resolution for unresolved symbol: '{}'", target_symbol ); } let mut best_match: Option<(NodeId, f32)> = None; let fuzzy_threshold = 0.5; // PHASE 1: Fast fuzzy string similarity matching for (symbol_name, _) in symbol_embeddings.iter() { if let Some(&node_id) = symbol_map.get(symbol_name) { let target_lower = target_symbol.to_lowercase(); let symbol_lower = symbol_name.to_lowercase(); let fuzzy_score = if target_lower.contains(&symbol_lower) || symbol_lower.contains(&target_lower) { 0.85 // High confidence for substring matches } else if target_lower.ends_with(&symbol_lower) || symbol_lower.ends_with(&target_lower) { 0.75 // Good confidence for suffix matches } else if Self::levenshtein_similarity(&target_lower, &symbol_lower) > 0.7 { 0.65 // Decent confidence for edit distance similarity } else { continue; }; if fuzzy_score > fuzzy_threshold { if let Some((_, best_score)) = best_match { if fuzzy_score > best_score { best_match = Some((node_id, fuzzy_score)); } } else { best_match = Some((node_id, fuzzy_score)); } } } } // If fuzzy matching found a good match, return it if let Some((node_id, confidence)) = best_match { if confidence > 0.75 { // High confidence fuzzy match if call_count < 10 { info!( "🎯 AI FUZZY MATCH: '{}' → known symbol with {:.1}% confidence", target_symbol, confidence * 100.0 ); } return Some(node_id); } } // PHASE 2: Real AI embedding semantic similarity using pre-computed unresolved embeddings let mut ai_best_match: Option<(NodeId, f32)> = None; if let Some(target_embedding) = unresolved_embeddings.get(target_symbol) { if call_count < 5 { info!( "🔍 Using pre-computed embedding for unresolved symbol: '{}'", target_symbol ); } let ai_threshold = 0.75; // Higher threshold for real AI embeddings // Compare target embedding with ALL known symbol embeddings for (symbol_name, symbol_embedding) in symbol_embeddings.iter() { if let Some(&node_id) = symbol_map.get(symbol_name) { let similarity = Self::cosine_similarity_static(target_embedding, symbol_embedding); if similarity > ai_threshold { if let Some((_, best_score)) = ai_best_match { if similarity > best_score { ai_best_match = Some((node_id, similarity)); } } else { ai_best_match = Some((node_id, similarity)); } } } } } // REVOLUTIONARY: Choose the best match between fuzzy and real AI embeddings let final_match = match (best_match, ai_best_match) { (Some((fuzzy_node, fuzzy_score)), Some((ai_node, ai_score))) => { // AI embeddings are more accurate than fuzzy when both exist if ai_score > 0.8 || (ai_score > fuzzy_score && ai_score > 0.7) { Some((ai_node, ai_score, "AI EMBEDDING")) } else { Some((fuzzy_node, fuzzy_score, "FUZZY")) } } (Some((fuzzy_node, fuzzy_score)), None) => Some((fuzzy_node, fuzzy_score, "FUZZY")), (None, Some((ai_node, ai_score))) => Some((ai_node, ai_score, "AI EMBEDDING")), (None, None) => None, }; if let Some((node_id, confidence, match_type)) = final_match { if call_count < 10 { info!( "🎯 {} MATCH: '{}' → known symbol with {:.1}% confidence", match_type, target_symbol, confidence * 100.0 ); } return Some(node_id); } None // No semantic match found } /// Calculate Levenshtein similarity score between two strings (0.0 to 1.0) #[cfg(feature = "ai-enhanced")] fn levenshtein_similarity(s1: &str, s2: &str) -> f32 { let len1 = s1.chars().count(); let len2 = s2.chars().count(); if len1 == 0 && len2 == 0 { return 1.0; } if len1 == 0 || len2 == 0 { return 0.0; } let max_len = len1.max(len2); let distance = Self::levenshtein_distance(s1, s2); 1.0 - (distance as f32 / max_len as f32) } /// Calculate Levenshtein distance between two strings #[cfg(feature = "ai-enhanced")] fn levenshtein_distance(s1: &str, s2: &str) -> usize { let v1: Vec<char> = s1.chars().collect(); let v2: Vec<char> = s2.chars().collect(); let len1 = v1.len(); let len2 = v2.len(); let mut matrix = vec![vec![0; len2 + 1]; len1 + 1]; for i in 0..=len1 { matrix[i][0] = i; } for j in 0..=len2 { matrix[0][j] = j; } for i in 1..=len1 { for j in 1..=len2 { let cost = if v1[i - 1] == v2[j - 1] { 0 } else { 1 }; matrix[i][j] = (matrix[i - 1][j] + 1) .min(matrix[i][j - 1] + 1) .min(matrix[i - 1][j - 1] + cost); } } matrix[len1][len2] } /// Static cosine similarity calculation for parallel processing #[cfg(feature = "ai-enhanced")] fn cosine_similarity_static(a: &[f32], b: &[f32]) -> f32 { if a.len() != b.len() { return 0.0; } let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum(); let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt(); let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt(); if norm_a == 0.0 || norm_b == 0.0 { 0.0 } else { dot_product / (norm_a * norm_b) } } async fn index_file( path: PathBuf, parse_pb: ProgressBar, embed_pb: ProgressBar, ) -> Result<FileStats> { debug!("Indexing file: {:?}", path); let mut stats = FileStats::default(); // Read file content let content = tokio_fs::read_to_string(&path).await?; stats.lines = content.lines().count(); // Very rough heuristics for functions/classes counts per common languages let ext = path.extension().and_then(OsStr::to_str).unwrap_or(""); let (fn_regex, class_regex) = match ext { "rs" => (Some(Regex::new(r"\bfn\s+\w+").unwrap()), None), "py" => ( Some(Regex::new(r"\bdef\s+\w+\s*\(").unwrap()), Some(Regex::new(r"\bclass\s+\w+\s*:").unwrap()), ), "ts" | "js" => ( Some(Regex::new(r"\bfunction\s+\w+|\b\w+\s*=\s*\(.*\)\s*=>").unwrap()), Some(Regex::new(r"\bclass\s+\w+").unwrap()), ), "go" => (Some(Regex::new(r"\bfunc\s+\w+\s*\(").unwrap()), None), "java" => ( Some(Regex::new(r"\b\w+\s+\w+\s*\(.*\)\s*\{").unwrap()), Some(Regex::new(r"\bclass\s+\w+").unwrap()), ), "cpp" | "cc" | "cxx" | "hpp" | "h" | "c" => ( Some(Regex::new(r"\b\w+\s+\w+\s*\(.*\)\s*\{").unwrap()), None, ), _ => (None, None), }; parse_pb.set_message(format!("Parsing {}", path.display())); parse_pb.inc(1); if let Some(re) = fn_regex { stats.functions = re.find_iter(&content).count(); } if let Some(re) = class_regex { stats.classes = re.find_iter(&content).count(); } // Pretend to generate embeddings by counting tokens roughly embed_pb.set_message(format!("Embedding {}", path.display())); stats.embeddings = content.split_whitespace().count() / 100; // 1 per ~100 tokens embed_pb.inc(1); Ok(stats) } async fn collect_files(&self, path: &Path) -> Result<Vec<PathBuf>> { let mut files = Vec::new(); let walker = if self.config.recursive { WalkDir::new(path) } else { WalkDir::new(path).max_depth(1) }; for entry in walker { let entry = entry?; let path = entry.path(); if path.is_dir() { continue; } if self.should_index(path) { files.push(path.to_path_buf()); } } Ok(files) } fn should_index(&self, path: &Path) -> bool { let file_name = path.file_name().and_then(OsStr::to_str).unwrap_or(""); let path_str = path.to_string_lossy(); // Exclude patterns (simple substring match) for pat in &self.config.exclude_patterns { if path_str.contains(pat) || file_name.contains(pat) { return false; } } // Include patterns (if provided, must match at least one) if !self.config.include_patterns.is_empty() && !self .config .include_patterns .iter() .any(|p| path_str.contains(p) || file_name.contains(p)) { return false; } // Language filtering by extension if !self.config.languages.is_empty() { let ext = path .extension() .and_then(OsStr::to_str) .unwrap_or("") .to_lowercase(); let lang_matches = |langs: &Vec<String>, e: &str| -> bool { let lang = e; langs.iter().any(|l| match l.as_str() { "rust" | "rs" => matches!(lang, "rs"), "python" | "py" => matches!(lang, "py"), "js" | "javascript" | "jsx" => matches!(lang, "js" | "jsx"), "ts" | "typescript" | "tsx" => matches!(lang, "ts" | "tsx"), "go" => matches!(lang, "go"), "java" => matches!(lang, "java"), "cpp" | "c++" | "cc" | "cxx" | "hpp" | "h" | "c" => { matches!(lang, "cpp" | "cc" | "cxx" | "hpp" | "h" | "c") } _ => false, }) }; if !lang_matches(&self.config.languages, &ext) { return false; } } // Default excludes for dir in [ ".git", "node_modules", "target", ".codegraph", "dist", "build", ] { if path_str.contains(dir) { return false; } } true } async fn is_indexed(&self, _path: &Path) -> Result<bool> { let db = { let storage = self.surreal.lock().await; storage.db() }; let mut response = db .query("SELECT VALUE count() FROM project_metadata WHERE project_id = $project_id") .bind(("project_id", self.project_id.clone())) .await .context("Failed to query project_metadata")?; let counts: Vec<i64> = response.take(0)?; let count = counts.get(0).cloned().unwrap_or(0); Ok(count > 0) } async fn has_file_metadata(&self) -> Result<bool> { let db = { let storage = self.surreal.lock().await; storage.db() }; let mut response = db .query("SELECT count() AS count FROM file_metadata WHERE project_id = $project_id GROUP ALL") .bind(("project_id", self.project_id.clone())) .await .context("Failed to query file_metadata count")?; let counts: Vec<i64> = response.take(0)?; let count = counts.get(0).cloned().unwrap_or(0); Ok(count > 0) } /// Calculate SHA-256 hash of file content fn calculate_file_hash(file_path: &Path) -> Result<String> { // Resolve symlinks to actual file let canonical_path = fs::canonicalize(file_path) .with_context(|| format!("Failed to resolve path: {:?}", file_path))?; let mut file = fs::File::open(&canonical_path) .with_context(|| format!("Failed to open file: {:?}", canonical_path))?; let mut hasher = Sha256::new(); let mut buffer = [0u8; 8192]; loop { let bytes_read = file .read(&mut buffer) .with_context(|| format!("Failed to read file: {:?}", canonical_path))?; if bytes_read == 0 { break; } hasher.update(&buffer[..bytes_read]); } Ok(format!("{:x}", hasher.finalize())) } /// Detect changes between current filesystem and stored file metadata async fn detect_file_changes(&self, current_files: &[PathBuf]) -> Result<Vec<FileChange>> { let storage = self.surreal.lock().await; let stored_metadata = storage .get_file_metadata_for_project(&self.project_id) .await?; drop(storage); // Create lookup map of stored files let stored_map: HashMap<String, FileMetadataRecord> = stored_metadata .into_iter() .map(|record| (record.file_path.clone(), record)) .collect(); let mut changes = Vec::new(); let mut current_file_set = HashSet::new(); // Check current files for additions or modifications for file_path in current_files { let file_path_str = file_path.to_string_lossy().to_string(); current_file_set.insert(file_path_str.clone()); let current_hash = Self::calculate_file_hash(file_path)?; match stored_map.get(&file_path_str) { Some(stored) => { if stored.content_hash != current_hash { changes.push(FileChange { file_path: file_path_str, change_type: FileChangeType::Modified, current_hash: Some(current_hash), previous_hash: Some(stored.content_hash.clone()), }); } else { changes.push(FileChange { file_path: file_path_str, change_type: FileChangeType::Unchanged, current_hash: Some(current_hash), previous_hash: Some(stored.content_hash.clone()), }); } } None => { changes.push(FileChange { file_path: file_path_str, change_type: FileChangeType::Added, current_hash: Some(current_hash), previous_hash: None, }); } } } // Check for deleted files for (stored_path, stored_record) in stored_map { if !current_file_set.contains(&stored_path) { changes.push(FileChange { file_path: stored_path, change_type: FileChangeType::Deleted, current_hash: None, previous_hash: Some(stored_record.content_hash), }); } } Ok(changes) } /// Delete nodes and edges for specific files async fn delete_data_for_files(&self, file_paths: &[String]) -> Result<()> { if file_paths.is_empty() { return Ok(()); } // Use writer to delete nodes, edges, and file metadata as one ordered operation self.surreal_writer_handle()? .enqueue_delete_nodes_by_file(file_paths.to_vec(), &self.project_id) .await } /// Persist file metadata for incremental indexing change tracking async fn persist_file_metadata( &self, files: &[PathBuf], nodes: &[CodeNode], edges: &[EdgeRelationship], ) -> Result<()> { let mut file_metadata_records = Vec::new(); // Create progress bar for file metadata let metadata_pb = self.progress.add(ProgressBar::new(files.len() as u64)); metadata_pb.set_style( ProgressStyle::with_template("{spinner:.blue} {msg} [{bar:40.cyan/blue}] {pos}/{len}") .unwrap() .progress_chars("█▓▒░ "), ); metadata_pb.set_message("💾 Processing file metadata"); // Build HashMap for O(1) lookups instead of O(N) iterations let mut file_stats: HashMap<String, (i64, i64)> = HashMap::new(); // Count nodes per file - O(nodes) for node in nodes { let entry = file_stats .entry(node.location.file_path.clone()) .or_insert((0, 0)); entry.0 += 1; } // Build node-to-file mapping for edge counting - O(nodes) let node_file_map: HashMap<NodeId, String> = nodes .iter() .map(|n| (n.id, n.location.file_path.clone())) .collect(); // Count edges per file - O(edges) for edge in edges { if let Some(file_path) = node_file_map.get(&edge.from) { let entry = file_stats.entry(file_path.clone()).or_insert((0, 0)); entry.1 += 1; } } for file_path in files { let file_path_str = file_path.to_string_lossy().to_string(); // Calculate hash and get file info let content_hash = Self::calculate_file_hash(file_path).unwrap_or_else(|_| "error".to_string()); let metadata = fs::metadata(file_path).ok(); let file_size = metadata.as_ref().map(|m| m.len() as i64).unwrap_or(0); let modified_at = metadata .as_ref() .and_then(|m| m.modified().ok()) .and_then(|t| { let duration = t.duration_since(std::time::UNIX_EPOCH).ok()?; Some( chrono::DateTime::from_timestamp(duration.as_secs() as i64, 0) .unwrap_or_else(chrono::Utc::now), ) }) .unwrap_or_else(chrono::Utc::now); // Get counts from HashMap - O(1) lookup let (node_count, edge_count) = file_stats.get(&file_path_str).copied().unwrap_or((0, 0)); file_metadata_records.push(FileMetadataRecord { file_path: file_path_str, project_id: self.project_id.clone(), content_hash, modified_at, file_size, last_indexed_at: chrono::Utc::now(), node_count, edge_count, language: None, // Will be inferred from file extension if needed parse_errors: None, }); metadata_pb.inc(1); } // Batch upsert file metadata self.surreal_writer_handle()? .enqueue_file_metadata(file_metadata_records) .await?; metadata_pb.finish_with_message(format!( "💾 File metadata complete: {} files tracked", files.len() )); info!("💾 Persisted metadata for {} files", files.len()); Ok(()) } async fn persist_project_metadata( &self, stats: &IndexStats, node_count: usize, edge_count: usize, ) -> Result<()> { let project_name = self .project_root .file_name() .and_then(|s| s.to_str()) .unwrap_or(&self.project_id) .to_string(); let root_path = self.project_root.to_string_lossy().to_string(); let primary_language = self.config.languages.first().cloned(); let record = ProjectMetadataRecord { project_id: self.project_id.clone(), name: project_name, root_path, primary_language, file_count: stats.files as i64, node_count: node_count as i64, edge_count: edge_count as i64, avg_coverage_score: 0.0, last_analyzed: chrono::Utc::now(), codegraph_version: env!("CARGO_PKG_VERSION").to_string(), organization_id: self.organization_id.clone(), domain: self.domain.clone(), }; self.enqueue_project_metadata_record(record).await } fn annotate_node(&self, node: &mut CodeNode) { node.metadata .attributes .insert("project_id".to_string(), self.project_id.clone()); if let Some(org) = &self.organization_id { node.metadata .attributes .insert("organization_id".to_string(), org.clone()); } if let Some(repo) = &self.repository_url { node.metadata .attributes .insert("repository_url".to_string(), repo.clone()); } if let Some(domain) = &self.domain { node.metadata .attributes .insert("domain".to_string(), domain.clone()); } // Add embedding model name for tracking which embedding provider was used node.metadata .attributes .insert("embedding_model".to_string(), self.embedding_model.clone()); } async fn persist_nodes_batch(&self, chunk: &[CodeNode]) -> Result<()> { self.enqueue_nodes_chunk(chunk).await } async fn log_surreal_node_count(&self, expected: usize) { let db = { let storage = self.surreal.lock().await; storage.db() }; match db .query("SELECT VALUE count() FROM nodes WHERE project_id = $project_id") .bind(("project_id", self.project_id.clone())) .await { Ok(mut resp) => match resp.take::<Option<i64>>(0) { Ok(count_opt) => { let count = count_opt.unwrap_or(0); info!( "🗄️ SurrealDB nodes persisted: {} (expected ≈ {})", count, expected ); } Err(e) => { warn!("⚠️ Failed to read SurrealDB node count: {}", e); } }, Err(e) => { warn!("⚠️ SurrealDB node count query failed: {}", e); } } } #[cfg(feature = "embeddings")] async fn log_surreal_chunk_count(&self, expected: usize) { let db = { let storage = self.surreal.lock().await; storage.db() }; match db .query("SELECT count() AS count FROM chunks WHERE project_id = $project_id GROUP ALL") .bind(("project_id", self.project_id.clone())) .await { Ok(mut resp) => match resp.take::<Vec<i64>>(0) { Ok(counts) => { let count = counts.get(0).cloned().unwrap_or(0); info!( "🧩 SurrealDB chunks persisted: {} (expected ≈ {})", count, expected ); } Err(e) => { warn!("⚠️ Failed to read SurrealDB chunk count: {}", e); } }, Err(e) => { warn!("⚠️ SurrealDB chunk count query failed: {}", e); } } } async fn log_surreal_edge_count(&self, expected: usize) { let db = { let storage = self.surreal.lock().await; storage.db() }; match db.query("SELECT VALUE count() FROM edges").await { Ok(mut resp) => match resp.take::<Option<i64>>(0) { Ok(count_opt) => { let count = count_opt.unwrap_or(0); info!( "🗄️ SurrealDB edges persisted: {} (expected ≈ {})", count, expected ); if count < expected as i64 { warn!( "⚠️ Edge count ({}) is lower than resolved edges ({}). Verify SurrealDB schema and filters.", count, expected ); } } Err(e) => warn!("⚠️ Failed to read SurrealDB edge count: {}", e), }, Err(e) => warn!("⚠️ SurrealDB edge count query failed: {}", e), } } async fn verify_file_metadata_count(&self, expected_files: usize) -> Result<()> { if expected_files == 0 { return Ok(()); } let db = { let storage = self.surreal.lock().await; storage.db() }; let mut resp = db .query( "SELECT count() AS count FROM file_metadata WHERE project_id = $project_id GROUP ALL", ) .bind(("project_id", self.project_id.clone())) .await .context("Failed to verify file_metadata count")?; let counts: Vec<serde_json::Value> = resp.take(0)?; let count = counts .get(0) .and_then(|v| v.get("count")) .and_then(|v| v.as_i64()) .unwrap_or(0); if count < expected_files as i64 { Err(anyhow!( "file_metadata count {} is less than expected {} for project {}", count, expected_files, self.project_id )) } else { Ok(()) } } async fn verify_project_metadata_present(&self) -> Result<()> { let db = { let storage = self.surreal.lock().await; storage.db() }; let mut resp = db .query("SELECT count() AS count FROM project_metadata WHERE project_id = $project_id GROUP ALL") .bind(("project_id", self.project_id.clone())) .await .context("Failed to verify project_metadata count")?; let counts: Vec<serde_json::Value> = resp.take(0)?; let count = counts .get(0) .and_then(|v| v.get("count")) .and_then(|v| v.as_i64()) .unwrap_or(0); if count < 1 { Err(anyhow!( "project_metadata missing for project {}; ensure schema applied and permissions allow writes", self.project_id )) } else { Ok(()) } } async fn persist_node_embeddings(&self, nodes: &[CodeNode]) -> Result<()> { let mut records = Vec::new(); for node in nodes { if let Some(embedding) = &node.embedding { records.push(NodeEmbeddingRecord { id: node.id.to_string(), column: self.embedding_column.column_name(), embedding: embedding.iter().map(|&v| v as f64).collect(), updated_at: chrono::Utc::now(), }); } } if records.is_empty() { return Ok(()); } self.surreal_writer_handle()? .enqueue_node_embeddings(records) .await } async fn store_symbol_embedding( &self, symbol: &str, node_id: Option<NodeId>, source_edge_id: Option<&str>, embedding: &[f32], ) -> Result<()> { let record = self.build_symbol_embedding_record(symbol, node_id, source_edge_id, embedding); self.persist_symbol_embedding_records(vec![record]).await } fn build_symbol_embedding_record( &self, symbol: &str, node_id: Option<NodeId>, source_edge_id: Option<&str>, embedding: &[f32], ) -> SymbolEmbeddingRecord { let normalized = Self::normalize_symbol(symbol); let node_id_string = node_id.map(|id| id.to_string()); SymbolEmbeddingRecord::new( &self.project_id, self.organization_id.as_deref(), symbol, &normalized, embedding, &self.embedding_model, self.embedding_column.column_name(), node_id_string.as_deref(), source_edge_id, None, ) } async fn persist_symbol_embedding_records( &self, records: Vec<SymbolEmbeddingRecord>, ) -> Result<()> { if records.is_empty() { return Ok(()); } let batch_size = symbol_embedding_db_batch_size(); let handle = self.surreal_writer_handle()?; for chunk in records.chunks(batch_size) { handle.enqueue_symbol_embeddings(chunk.to_vec()).await?; } Ok(()) } async fn enqueue_chunk_embeddings(&self, records: Vec<ChunkEmbeddingRecord>) -> Result<()> { if records.is_empty() { return Ok(()); } debug!( "🧩 Queueing {} chunk embeddings for SurrealDB", records.len() ); let batch_size = chunk_embedding_db_batch_size(self.config.batch_size.max(1)); let handle = self.surreal_writer_handle()?; for chunk in records.chunks(batch_size) { handle.enqueue_chunk_embeddings(chunk.to_vec()).await?; } Ok(()) } fn surreal_writer_handle(&self) -> Result<&SurrealWriterHandle> { self.surreal_writer .as_ref() .ok_or_else(|| anyhow!("Surreal writer not initialized")) } async fn enqueue_nodes_chunk(&self, nodes: &[CodeNode]) -> Result<()> { if nodes.is_empty() { return Ok(()); } let batch: Vec<CodeNode> = nodes.to_vec(); self.surreal_writer_handle()?.enqueue_nodes(batch).await } async fn enqueue_edges(&self, edges: Vec<CodeEdge>) -> Result<()> { if edges.is_empty() { return Ok(()); } self.surreal_writer_handle()?.enqueue_edges(edges).await } async fn enqueue_project_metadata_record(&self, record: ProjectMetadataRecord) -> Result<()> { self.surreal_writer_handle()? .enqueue_project_metadata(record) .await } async fn flush_surreal_writer(&self) -> Result<()> { if let Some(writer) = &self.surreal_writer { writer.flush().await } else { Ok(()) } } async fn shutdown_surreal_writer(&mut self) -> Result<()> { if let Some(writer) = self.surreal_writer.take() { writer.shutdown().await } else { Ok(()) } } async fn connect_surreal_from_env() -> Result<( Arc<TokioMutex<SurrealDbStorage>>, Vec<Arc<TokioMutex<SurrealDbStorage>>>, )> { let connection = Self::surreal_env_value("CODEGRAPH_SURREALDB_URL", "SURREALDB_URL") .context("CODEGRAPH_SURREALDB_URL or SURREALDB_URL must be set")?; let namespace = Self::surreal_env_value("CODEGRAPH_SURREALDB_NAMESPACE", "SURREALDB_NAMESPACE") .unwrap_or_else(|| "codegraph".to_string()); let use_graph_db = Self::surreal_env_value("CODEGRAPH_USE_GRAPH_SCHEMA", "") .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) .unwrap_or(false); let graph_db = Self::surreal_env_value("CODEGRAPH_GRAPH_DB_DATABASE", "") .unwrap_or_else(|| "codegraph_graph".to_string()); let database = if use_graph_db { graph_db } else { Self::surreal_env_value("CODEGRAPH_SURREALDB_DATABASE", "SURREALDB_DATABASE") .unwrap_or_else(|| "main".to_string()) }; let username = Self::surreal_env_value("CODEGRAPH_SURREALDB_USERNAME", "SURREALDB_USERNAME"); let password = Self::surreal_env_value("CODEGRAPH_SURREALDB_PASSWORD", "SURREALDB_PASSWORD"); info!( "🗄️ Connecting to SurrealDB: {} namespace={} database={}", Self::sanitize_surreal_url(&connection), namespace, database ); let config = SurrealDbConfig { connection: connection.clone(), namespace: namespace.clone(), database: database.clone(), username: username.clone(), password: password.clone(), ..SurrealDbConfig::default() }; let pool_size = std::env::var("CODEGRAPH_SURREAL_POOL_SIZE") .ok() .and_then(|v| v.parse::<usize>().ok()) .map(|n| n.clamp(1, 4)) .unwrap_or(1); let mut pool = Vec::with_capacity(pool_size); for _ in 0..pool_size { let storage = SurrealDbStorage::new(config.clone()) .await .with_context(|| format!("Failed to connect to SurrealDB at {}", connection))?; pool.push(Arc::new(TokioMutex::new(storage))); } let storage = pool .first() .cloned() .ok_or_else(|| anyhow!("Surreal pool empty after initialization"))?; info!( "🗄️ SurrealDB connection established: {} namespace={} database={}", Self::sanitize_surreal_url(&connection), namespace, database ); Ok((storage, pool)) } fn log_surrealdb_status(&self, phase: &str) { let connection = Self::surreal_env_value("CODEGRAPH_SURREALDB_URL", "SURREALDB_URL"); let namespace = Self::surreal_env_value("CODEGRAPH_SURREALDB_NAMESPACE", "SURREALDB_NAMESPACE") .unwrap_or_else(|| "codegraph".to_string()); let database = Self::surreal_env_value("CODEGRAPH_SURREALDB_DATABASE", "SURREALDB_DATABASE") .unwrap_or_else(|| "main".to_string()); let username = Self::surreal_env_value("CODEGRAPH_SURREALDB_USERNAME", "SURREALDB_USERNAME"); let auth_state = if username.is_some() || Self::surreal_env_value("CODEGRAPH_SURREALDB_PASSWORD", "SURREALDB_PASSWORD") .is_some() { "credentials configured" } else { "no auth" }; match connection { Some(raw) => { let sanitized = Self::sanitize_surreal_url(&raw); info!( "🗄️ SurrealDB ({}): target={} namespace={} database={} auth={}", phase, sanitized, namespace, database, auth_state ); } None => { info!( "🗄️ SurrealDB ({}): connection not configured (set CODEGRAPH_SURREALDB_URL or SURREALDB_URL)", phase ); } } } fn surreal_env_value(primary: &str, fallback: &str) -> Option<String> { env::var(primary) .ok() .filter(|v| !v.trim().is_empty()) .or_else(|| env::var(fallback).ok().filter(|v| !v.trim().is_empty())) } fn sanitize_surreal_url(raw: &str) -> String { if let Ok(mut url) = Url::parse(raw) { if !url.username().is_empty() { let _ = url.set_username("****"); } if url.password().is_some() { let _ = url.set_password(Some("****")); } url.to_string() } else { raw.to_string() } } fn normalize_symbol(symbol: &str) -> String { symbol.trim().to_lowercase() } fn normalize_symbol_target(target: &str) -> Vec<String> { // Normalize edge targets to match codegraph-parser emitted symbol keys. let mut variants = Vec::new(); let mut base = target.trim(); // Strip trailing macro bang if let Some(stripped) = base.strip_suffix('!') { base = stripped; } // Strip call parentheses and args: take up to first '(' if let Some(idx) = base.find('(') { base = &base[..idx]; } // Strip generics by removing everything from first '<' that has a matching '>' let mut generic_stripped = String::new(); let mut depth = 0; for ch in base.chars() { if ch == '<' { depth += 1; continue; } if ch == '>' && depth > 0 { depth -= 1; continue; } if depth == 0 { generic_stripped.push(ch); } } let mut cleaned = generic_stripped.trim().to_string(); // Strip trait qualification: "Type as Trait::method" -> "Type::method" if let Some(pos) = cleaned.find(" as ") { let after = &cleaned[pos + 4..]; if let Some(idx) = after.find("::") { cleaned = format!("{}{}", &cleaned[..pos], &after[idx..]); } } // Align with parser naming: drop self./this./super./crate:: prefixes let prefixes = ["self::", "self.", "this.", "super::", "super.", "crate::"]; for p in prefixes { if let Some(stripped) = cleaned.strip_prefix(p) { cleaned = stripped.to_string(); break; } } // Generate both module separators used by emitters (Rust uses ::, Python/JS often .) let dotted = cleaned.replace("::", "."); let coloned = cleaned.replace('.', "::"); let lower_clean = cleaned.to_lowercase(); let lower_dotted = dotted.to_lowercase(); let lower_coloned = coloned.to_lowercase(); variants.push(cleaned.clone()); variants.push(lower_clean.clone()); variants.push(dotted.clone()); variants.push(lower_dotted.clone()); variants.push(coloned.clone()); variants.push(lower_coloned.clone()); // Push last path segment variants for candidate in [&cleaned, &dotted, &coloned] { if let Some(last) = candidate.rsplit(['.', ':']).next() { variants.push(last.to_string()); variants.push(last.to_lowercase()); } } variants.sort(); variants.dedup(); variants } #[cfg(test)] pub(crate) fn normalize_symbol_target_for_tests(target: &str) -> Vec<String> { Self::normalize_symbol_target(target) } fn normalize_rust_symbol(target: &str) -> Vec<String> { // Demangle if possible (rustc, then Symbolic as fallback to strip hashes) let demangled = try_demangle(target) .map(|d| d.to_string()) .unwrap_or_else(|_| target.to_string()); let symbolic = demangle(&demangled).into_owned(); let mut out = Vec::new(); // Strip generics/path args via syn if let Ok(path) = parse_syn_path::<SynPath>(&symbolic) { let mut simple = Vec::new(); for seg in path.segments { let name = seg.ident.to_string(); simple.push(name); } if !simple.is_empty() { let joined = simple.join("::"); out.push(joined.clone()); out.push(joined.to_lowercase()); if let Some(last) = simple.last() { out.push(last.clone()); out.push(last.to_lowercase()); } } } // Fallback: return demangled text plus lowered forms if out.is_empty() { out.push(symbolic.clone()); out.push(symbolic.to_lowercase()); } out } fn normalize_js_symbol(target: &str) -> Vec<String> { // Lightweight heuristic until JS parser dependency is restored Self::normalize_symbol_target(target) } fn normalize_python_symbol(target: &str) -> Vec<String> { // Fallback: simple split heuristics for Python until AST parser is re-enabled Self::normalize_symbol_target(target) } fn create_progress_bar(&self, total: u64, message: &str) -> ProgressBar { let pb = self.progress.add(ProgressBar::new(total)); pb.set_style( ProgressStyle::default_bar() .template( "{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({percent}%) {msg} | {per_sec} | ETA: {eta}", ) .unwrap() .progress_chars("█▉▊▋▌▍▎▏ "), // Better visual progress ); pb.set_message(message.to_string()); pb } #[cfg(feature = "embeddings")] fn load_file_sources( &self, files: &Vec<(PathBuf, u64)>, ) -> std::collections::HashMap<String, String> { let mut map = std::collections::HashMap::new(); for (p, _) in files { if let Ok(content) = std::fs::read_to_string(p) { map.insert(p.to_string_lossy().to_string(), content); } } map } /// Create enhanced progress bar with dual metrics for files and success rates fn create_dual_progress_bar( &self, total: u64, primary_msg: &str, secondary_msg: &str, ) -> ProgressBar { let pb = self.progress.add(ProgressBar::new(total)); pb.set_style( ProgressStyle::default_bar() .template( "{spinner:.green} [{elapsed_precise}] [{bar:50.cyan/blue}] {pos}/{len} {msg.bold} | Success Rate: {percent}% | Speed: {per_sec} | ETA: {eta}", ) .unwrap() .progress_chars("█▉▊▋▌▍▎▏ "), ); pb.set_message(format!("{} | {}", primary_msg, secondary_msg)); pb } /// Create high-performance progress bar for batch processing fn create_batch_progress_bar(&self, total: u64, batch_size: usize, label: &str) -> ProgressBar { let pb = self.progress.add(ProgressBar::new(total)); let batch_info = if batch_size >= 10000 { format!("🚀 Ultra-High Performance ({}K batch)", batch_size / 1000) } else if batch_size >= 5000 { format!("⚡ High Performance ({}K batch)", batch_size / 1000) } else if batch_size >= 1000 { format!("🔥 Optimized ({} batch)", batch_size) } else { format!("Standard ({} batch)", batch_size) }; pb.set_style( ProgressStyle::default_bar() .template( "{spinner:.green} [{elapsed_precise}] [{bar:45.cyan/blue}] {pos}/{len} items\n 💾 {msg} | {percent}% | {per_sec} | ETA: {eta}", ) .unwrap() .progress_chars("█▉▊▋▌▍▎▏ "), ); pb.set_message(format!("{} | {}", label, batch_info)); pb } /// Index a single file (for daemon mode incremental updates) /// Uses upsert semantics - no duplicate records created pub async fn index_single_file(&self, path: &Path) -> Result<()> { // Check if file should be indexed if !self.should_index(path) { debug!("Skipping file (excluded by config): {:?}", path); return Ok(()); } // Detect language let language = self.detect_language(path); if language.is_none() && self.config.languages.is_empty() { debug!("Skipping file (unknown language): {:?}", path); return Ok(()); } let file_path_str = path.to_string_lossy().to_string(); info!("Indexing single file: {}", file_path_str); // Step 1: Parse file with tree-sitter to extract nodes and edges let extraction_result = self .parser .parse_file_with_edges(&file_path_str) .await .with_context(|| format!("Failed to parse file: {}", file_path_str))?; let mut nodes = extraction_result.nodes; let mut edges = extraction_result.edges; if nodes.is_empty() { debug!("No nodes extracted from file: {}", file_path_str); return Ok(()); } info!( "Extracted {} nodes and {} edges from {}", nodes.len(), edges.len(), file_path_str ); // Step 2: Generate deterministic IDs and annotate nodes with project metadata // Build old_id -> new_id mapping to update edge references let mut id_mapping: std::collections::HashMap<NodeId, NodeId> = std::collections::HashMap::new(); for node in &mut nodes { let old_id = node.id; node.set_deterministic_id(&self.project_id); id_mapping.insert(old_id, node.id); self.annotate_node(node); } // Update edge.from references to use new deterministic IDs for edge in &mut edges { if let Some(new_id) = id_mapping.get(&edge.from) { edge.from = *new_id; } } // Step 3: Generate embeddings for nodes (if embeddings feature enabled) #[cfg(feature = "embeddings")] { match self.embedder.generate_embeddings(&nodes).await { Ok(embeddings) => { // Assign embeddings to nodes for (node, embedding) in nodes.iter_mut().zip(embeddings.into_iter()) { node.embedding = Some(embedding); } debug!("Generated embeddings for {} nodes", nodes.len()); } Err(e) => { warn!( "Failed to generate embeddings for file {}: {}", file_path_str, e ); // Continue without embeddings - not a fatal error } } } // Step 4: Persist nodes via upsert (handles duplicates automatically) self.persist_nodes_batch(&nodes) .await .with_context(|| format!("Failed to persist nodes for file: {}", file_path_str))?; // Step 5: Persist node embeddings #[cfg(feature = "embeddings")] { if let Err(e) = self.persist_node_embeddings(&nodes).await { warn!( "Failed to persist embeddings for file {}: {}", file_path_str, e ); // Continue - not a fatal error } } // Step 6: Handle intra-file edges // Build a set of node IDs in this file for quick lookup let node_ids: std::collections::HashSet<_> = nodes.iter().map(|n| n.id.to_string()).collect(); // Build symbol name to node ID map for edge resolution let symbol_map: std::collections::HashMap<String, codegraph_core::NodeId> = nodes .iter() .map(|n| (n.name.to_string(), n.id.clone())) .collect(); // Convert EdgeRelationship to CodeEdge for intra-file edges only let resolved_edges: Vec<codegraph_graph::CodeEdge> = edges .into_iter() .filter_map(|edge_rel| { // Try to resolve the target symbol to a node ID in this file if let Some(target_id) = symbol_map.get(&edge_rel.to) { // Both from and to are in this file - create CodeEdge Some(codegraph_graph::CodeEdge::new( edge_rel.from, target_id.clone(), edge_rel.edge_type, )) } else { // Cross-file edge - skip for now (will be resolved on next full index) None } }) .collect(); if !resolved_edges.is_empty() { info!( "Persisting {} intra-file edges for {}", resolved_edges.len(), file_path_str ); self.enqueue_edges(resolved_edges) .await .with_context(|| format!("Failed to persist edges for file: {}", file_path_str))?; } info!( "Successfully indexed file: {} ({} nodes)", file_path_str, nodes.len() ); Ok(()) } /// Delete all indexed data for a file (cascade delete) /// Removes nodes, edges, embeddings, and metadata for the file pub async fn delete_file_data(&self, path: &Path) -> Result<()> { let file_path = path.to_string_lossy().to_string(); info!("Deleting indexed data for file: {}", file_path); self.delete_data_for_files(&[file_path]).await } /// Detect language from file extension fn detect_language(&self, path: &Path) -> Option<codegraph_core::Language> { use codegraph_core::Language; let ext = path.extension()?.to_str()?.to_lowercase(); match ext.as_str() { "rs" => Some(Language::Rust), "py" => Some(Language::Python), "ts" => Some(Language::TypeScript), "tsx" => Some(Language::TypeScript), "js" => Some(Language::JavaScript), "jsx" => Some(Language::JavaScript), "go" => Some(Language::Go), "java" => Some(Language::Java), "cpp" | "cc" | "cxx" | "hpp" | "hxx" | "c" | "h" => Some(Language::Cpp), "swift" => Some(Language::Swift), "kt" | "kts" => Some(Language::Kotlin), "cs" => Some(Language::CSharp), "rb" => Some(Language::Ruby), "php" => Some(Language::Php), "dart" => Some(Language::Dart), _ => None, } } pub async fn watch_for_changes(&self, path: impl AsRef<Path>) -> Result<()> { use notify::event::{EventKind, ModifyKind}; use notify::{Event, RecursiveMode, Watcher}; let path = path.as_ref().to_path_buf(); let (tx, mut rx) = mpsc::channel(100); let debounce_ms: u64 = std::env::var("CODEGRAPH_WATCH_DEBOUNCE_MS") .ok() .and_then(|v| v.parse().ok()) .unwrap_or(300); let mut watcher = notify::recommended_watcher(move |res: std::result::Result<Event, _>| { if let Ok(event) = res { let _ = tx.blocking_send(event); } })?; watcher.watch(&path, RecursiveMode::Recursive)?; info!("Watching for changes in: {:?}", path); use std::collections::HashMap; use std::time::{Duration, Instant}; let mut last_events: HashMap<PathBuf, Instant> = HashMap::new(); while let Some(event) = rx.recv().await { match event.kind { EventKind::Modify(ModifyKind::Data(_)) | EventKind::Create(_) => { for path in event.paths { if self.should_index(&path) { let now = Instant::now(); let entry = last_events.entry(path.clone()).or_insert(now); if now.duration_since(*entry).as_millis() as u64 >= debounce_ms { *entry = now; info!("File changed: {:?}, reindexing (debounced)...", path); if let Err(e) = self.index_single_file(&path).await { warn!("Incremental reindex failed for {:?}: {}", path, e); } } else { debug!("Debounced change for {:?}", path); } } } } _ => {} } } Ok(()) } } fn symbol_embedding_db_batch_size() -> usize { const MAX: usize = 512; std::env::var("CODEGRAPH_SYMBOL_DB_BATCH_SIZE") .ok() .and_then(|value| value.parse::<usize>().ok()) .map(|parsed| parsed.clamp(1, MAX)) .unwrap_or(SYMBOL_EMBEDDING_DB_BATCH_LIMIT) } fn chunk_embedding_db_batch_size(fallback: usize) -> usize { const MAX: usize = 512; const DEFAULT_CAP: usize = 32; // Surreal query depth is limited; keep chunk writes modest by default std::env::var("CODEGRAPH_CHUNK_DB_BATCH_SIZE") .ok() .and_then(|value| value.parse::<usize>().ok()) .map(|parsed| parsed.clamp(1, MAX)) .unwrap_or(fallback.min(DEFAULT_CAP).min(MAX)) } fn resolve_surreal_embedding_column(dim: usize) -> Result<SurrealEmbeddingColumn> { match dim { 384 => Ok(SurrealEmbeddingColumn::Embedding384), 768 => Ok(SurrealEmbeddingColumn::Embedding768), 1024 => Ok(SurrealEmbeddingColumn::Embedding1024), 1536 => Ok(SurrealEmbeddingColumn::Embedding1536), 2048 => Ok(SurrealEmbeddingColumn::Embedding2048), 2560 => Ok(SurrealEmbeddingColumn::Embedding2560), 3072 => Ok(SurrealEmbeddingColumn::Embedding3072), 4096 => Ok(SurrealEmbeddingColumn::Embedding4096), other => Err(anyhow!( "Unsupported embedding dimension {}. Supported: 384, 768, 1024, 1536, 2048, 2560, 3072, 4096", other )), } } #[cfg(test)] mod tests { use super::*; #[test] fn symbol_embedding_batch_size_defaults() { std::env::remove_var("CODEGRAPH_SYMBOL_DB_BATCH_SIZE"); assert_eq!( symbol_embedding_db_batch_size(), SYMBOL_EMBEDDING_DB_BATCH_LIMIT ); } #[test] fn symbol_embedding_batch_size_respects_env_and_clamps() { std::env::set_var("CODEGRAPH_SYMBOL_DB_BATCH_SIZE", "1024"); assert_eq!(symbol_embedding_db_batch_size(), 512); std::env::set_var("CODEGRAPH_SYMBOL_DB_BATCH_SIZE", "0"); assert_eq!(symbol_embedding_db_batch_size(), 1); std::env::remove_var("CODEGRAPH_SYMBOL_DB_BATCH_SIZE"); } #[test] fn surreal_embedding_column_supports_2560_dimension() { let column = resolve_surreal_embedding_column(2560).expect("2560-d embeddings should be supported"); assert_eq!(column.column_name(), SURR_EMBEDDING_COLUMN_2560); assert_eq!(column.dimension(), 2560); } #[test] fn surreal_embedding_column_supports_1536_dimension() { let column = resolve_surreal_embedding_column(1536).expect("1536-d embeddings should be supported"); assert_eq!(column.column_name(), SURR_EMBEDDING_COLUMN_1536); assert_eq!(column.dimension(), 1536); } #[test] fn surreal_embedding_column_supports_3072_dimension() { let column = resolve_surreal_embedding_column(3072).expect("3072-d embeddings should be supported"); assert_eq!(column.column_name(), SURR_EMBEDDING_COLUMN_3072); assert_eq!(column.dimension(), 3072); } } pub fn prepare_node_text(node: &CodeNode) -> String { let lang = node .language .as_ref() .map(|l| format!("{:?}", l)) .unwrap_or_else(|| "unknown".to_string()); let kind = node .node_type .as_ref() .map(|t| format!("{:?}", t)) .unwrap_or_else(|| "unknown".to_string()); let mut text = format!("{} {} {}", lang, kind, node.name); if let Some(c) = &node.content { text.push(' '); text.push_str(c); } // Semantic chunking with environment variable support let max_chunk_tokens = std::env::var("CODEGRAPH_MAX_CHUNK_TOKENS") .ok() .and_then(|v| v.parse::<usize>().ok()) .unwrap_or(512); // Default 512 tokens // Approximate character limit for quick check (1 token ≈ 4 chars) let approx_max_chars = max_chunk_tokens * 4; if text.len() > approx_max_chars { // Load Qwen2.5-Coder tokenizer for accurate token counting let tokenizer_path = std::path::PathBuf::from(concat!( env!("CARGO_MANIFEST_DIR"), "/../codegraph-vector/tokenizers/qwen2.5-coder.json" )); if let Ok(tokenizer) = tokenizers::Tokenizer::from_file(&tokenizer_path) { // Proper token-based chunking with Qwen2.5-Coder tokenizer let tok = std::sync::Arc::new(tokenizer); let token_counter = move |s: &str| -> usize { tok.encode(s, false) .map(|enc| enc.len()) .unwrap_or_else(|_| (s.len() + 3) / 4) }; let chunker = semchunk_rs::Chunker::new(max_chunk_tokens, Box::new(token_counter)); let chunks = chunker.chunk(&text); if let Some(first_chunk) = chunks.first() { text = first_chunk.clone(); } else { // Fallback to character truncation let mut new_len = approx_max_chars.min(text.len()); while new_len > 0 && !text.is_char_boundary(new_len) { new_len -= 1; } text.truncate(new_len); } } else { // Tokenizer not available - fallback to character truncation let mut new_len = approx_max_chars.min(text.len()); while new_len > 0 && !text.is_char_boundary(new_len) { new_len -= 1; } text.truncate(new_len); } } text } pub fn simple_text_embedding(text: &str, dimension: usize) -> Vec<f32> { let mut embedding = vec![0.0f32; dimension]; let mut hash = 5381u32; for b in text.bytes() { hash = hash.wrapping_mul(33).wrapping_add(b as u32); } let mut state = hash; for value in &mut embedding { state = state.wrapping_mul(1103515245).wrapping_add(12345); *value = ((state as f32 / u32::MAX as f32) - 0.5) * 2.0; } embedding } pub fn normalize(v: &[f32]) -> Vec<f32> { let mut out = v.to_vec(); let norm: f32 = out.iter().map(|x| x * x).sum::<f32>().sqrt(); if norm > 0.0 { for x in &mut out { *x /= norm; } } out } #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] pub struct IndexStats { pub files: usize, pub skipped: usize, pub lines: usize, pub functions: usize, pub classes: usize, pub structs: usize, pub traits: usize, pub embeddings: usize, pub errors: usize, // Extended stats for detailed reporting pub nodes: usize, pub edges: usize, pub chunks: usize, pub embedding_dimension: usize, pub embedding_provider: String, } impl IndexStats { fn merge(&mut self, other: FileStats) { self.files += 1; self.lines += other.lines; self.functions += other.functions; self.classes += other.classes; self.structs += other.structs; self.traits += other.traits; self.embeddings += other.embeddings; } } #[derive(Debug, Default, Clone)] struct FileStats { lines: usize, functions: usize, classes: usize, structs: usize, traits: usize, embeddings: usize, } #[cfg(feature = "embeddings")] use codegraph_vector::prep::chunker::ChunkMeta;

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server