Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
parallel_language_processor.rs•14.6 kB
/// REVOLUTIONARY: Parallel Language Architecture for M4 Max Performance /// /// COMPLETE IMPLEMENTATION: True multi-language parallel processing where different /// programming languages are processed on dedicated CPU cores simultaneously. use codegraph_core::{ExtractionResult, Language, Result}; use futures::stream::{self, StreamExt}; use std::collections::HashMap; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use tokio::sync::Semaphore; use tracing::info; /// Revolutionary parallel language processor for M4 Max optimization pub struct ParallelLanguageProcessor { core_assignment: CoreAssignmentStrategy, language_pools: HashMap<Language, Arc<Semaphore>>, results_aggregator: Arc<Mutex<LanguageResults>>, performance_tracker: Arc<Mutex<ParallelPerformanceMetrics>>, } #[derive(Debug, Clone)] pub struct CoreAssignmentStrategy { rust_cores: usize, typescript_cores: usize, python_cores: usize, shared_cores: usize, } impl Default for CoreAssignmentStrategy { fn default() -> Self { let total_cores = num_cpus::get(); if total_cores >= 16 { Self { rust_cores: 4, typescript_cores: 3, python_cores: 3, shared_cores: 6, } } else if total_cores >= 8 { Self { rust_cores: 2, typescript_cores: 2, python_cores: 2, shared_cores: 2, } } else { Self { rust_cores: 1, typescript_cores: 1, python_cores: 1, shared_cores: 1, } } } } #[derive(Debug, Default)] struct LanguageResults { all_nodes: Vec<codegraph_core::CodeNode>, all_edges: Vec<codegraph_core::EdgeRelationship>, language_metrics: HashMap<Language, LanguagePerformanceMetrics>, total_files_processed: usize, total_processing_time: std::time::Duration, } #[derive(Debug, Clone)] pub struct LanguagePerformanceMetrics { pub files_processed: usize, pub nodes_extracted: usize, pub edges_extracted: usize, pub processing_time: std::time::Duration, pub files_per_second: f64, pub nodes_per_second: f64, } #[derive(Debug, Default)] struct ParallelPerformanceMetrics { languages_processed_simultaneously: usize, core_utilization_percentage: f64, parallel_efficiency_ratio: f64, } impl ParallelLanguageProcessor { pub fn new() -> Self { let core_strategy = CoreAssignmentStrategy::default(); info!("šŸš€ Initializing Parallel Language Architecture for M4 Max"); info!( " šŸ¦€ Rust cores: {} (high complexity)", core_strategy.rust_cores ); info!( " šŸ“œ TypeScript cores: {} (type inference)", core_strategy.typescript_cores ); info!( " šŸ Python cores: {} (dynamic analysis)", core_strategy.python_cores ); info!( " 🌐 Shared cores: {} (other languages)", core_strategy.shared_cores ); let mut language_pools = HashMap::new(); language_pools.insert( Language::Rust, Arc::new(Semaphore::new(core_strategy.rust_cores)), ); language_pools.insert( Language::TypeScript, Arc::new(Semaphore::new(core_strategy.typescript_cores)), ); language_pools.insert( Language::JavaScript, Arc::new(Semaphore::new(core_strategy.typescript_cores)), ); language_pools.insert( Language::Python, Arc::new(Semaphore::new(core_strategy.python_cores)), ); let shared_semaphore = Arc::new(Semaphore::new(core_strategy.shared_cores)); for lang in [Language::Go, Language::Java, Language::Cpp] { language_pools.insert(lang, shared_semaphore.clone()); } Self { core_assignment: core_strategy, language_pools, results_aggregator: Arc::new(Mutex::new(LanguageResults::default())), performance_tracker: Arc::new(Mutex::new(ParallelPerformanceMetrics::default())), } } /// COMPLETE IMPLEMENTATION: Process multiple languages simultaneously on dedicated cores pub async fn process_files_parallel( &self, files_by_language: HashMap<Language, Vec<PathBuf>>, ) -> Result<ExtractionResult> { let start_time = std::time::Instant::now(); info!( "šŸš€ PARALLEL LANGUAGE PROCESSING: {} languages detected", files_by_language.len() ); { let mut tracker = self.performance_tracker.lock().unwrap(); tracker.languages_processed_simultaneously = files_by_language.len(); tracker.core_utilization_percentage = self.calculate_core_utilization(&files_by_language); } let language_tasks: Vec<_> = files_by_language .into_iter() .map(|(language, files)| { let semaphore = self .language_pools .get(&language) .cloned() .unwrap_or_else(|| self.language_pools.get(&Language::Rust).unwrap().clone()); let aggregator = self.results_aggregator.clone(); async move { self.process_language_files(language, files, semaphore, aggregator) .await } }) .collect(); let language_results = futures::future::join_all(language_tasks).await; for result in &language_results { if let Err(e) = result { tracing::warn!("Language processing error: {}", e); } } let total_time = start_time.elapsed(); let final_result = { let mut aggregator = self.results_aggregator.lock().unwrap(); aggregator.total_processing_time = total_time; ExtractionResult { nodes: std::mem::take(&mut aggregator.all_nodes), edges: std::mem::take(&mut aggregator.all_edges), } }; self.log_parallel_performance(&final_result, total_time); Ok(final_result) } async fn process_language_files( &self, language: Language, files: Vec<PathBuf>, semaphore: Arc<Semaphore>, aggregator: Arc<Mutex<LanguageResults>>, ) -> Result<()> { let start_time = std::time::Instant::now(); info!( "⚔ {:?} processing: {} files on {} dedicated cores", language, files.len(), semaphore.available_permits() ); let file_stream = stream::iter(files.into_iter().map(|file_path| { let semaphore = semaphore.clone(); let language = language.clone(); async move { let _permit = semaphore.acquire().await.unwrap(); self.process_single_file(file_path, language).await } })); let mut language_nodes = Vec::new(); let mut language_edges = Vec::new(); let mut files_processed = 0; let mut buffered_stream = file_stream.buffer_unordered(semaphore.available_permits()); while let Some(result) = buffered_stream.next().await { match result { Ok(extraction_result) => { language_nodes.extend(extraction_result.nodes); language_edges.extend(extraction_result.edges); files_processed += 1; } Err(e) => { tracing::warn!("Failed to process file for {:?}: {}", language, e); } } } let processing_time = start_time.elapsed(); let files_per_second = if processing_time.as_secs_f64() > 0.0 { files_processed as f64 / processing_time.as_secs_f64() } else { 0.0 }; let nodes_count = language_nodes.len(); let edges_count = language_edges.len(); { let mut agg = aggregator.lock().unwrap(); agg.all_nodes.extend(language_nodes); agg.all_edges.extend(language_edges); agg.total_files_processed += files_processed; agg.language_metrics.insert( language.clone(), LanguagePerformanceMetrics { files_processed, nodes_extracted: nodes_count, edges_extracted: edges_count, processing_time, files_per_second, nodes_per_second: if processing_time.as_secs_f64() > 0.0 { nodes_count as f64 / processing_time.as_secs_f64() } else { 0.0 }, }, ); } info!( "āœ… {:?} complete: {} files, {:.1} files/s, {} nodes, {} edges", language, files_processed, files_per_second, nodes_count, edges_count ); Ok(()) } async fn process_single_file( &self, file_path: PathBuf, language: Language, ) -> Result<ExtractionResult> { let parser = crate::TreeSitterParser::new(); match language { Language::Rust => { parser .parse_file_with_edges(&file_path.to_string_lossy()) .await } Language::TypeScript | Language::JavaScript => { parser .parse_file_with_edges(&file_path.to_string_lossy()) .await } Language::Python => { parser .parse_file_with_edges(&file_path.to_string_lossy()) .await } _ => { parser .parse_file_with_edges(&file_path.to_string_lossy()) .await } } } fn calculate_core_utilization( &self, files_by_language: &HashMap<Language, Vec<PathBuf>>, ) -> f64 { let total_cores = num_cpus::get(); let mut utilized_cores = 0; for (language, files) in files_by_language { if !files.is_empty() { utilized_cores += match language { Language::Rust => self.core_assignment.rust_cores, Language::TypeScript | Language::JavaScript => { self.core_assignment.typescript_cores } Language::Python => self.core_assignment.python_cores, _ => 1, }; } } utilized_cores.min(total_cores) as f64 / total_cores as f64 * 100.0 } fn log_parallel_performance(&self, result: &ExtractionResult, total_time: std::time::Duration) { let aggregator = self.results_aggregator.lock().unwrap(); let tracker = self.performance_tracker.lock().unwrap(); info!("šŸŽ‰ PARALLEL PROCESSING COMPLETE:"); info!( " šŸ“Š Total extracted: {} nodes, {} edges", result.nodes.len(), result.edges.len() ); info!(" ā±ļø Total time: {:.2}s", total_time.as_secs_f64()); info!( " šŸ”„ Overall rate: {:.1} files/s, {:.1} nodes/s", aggregator.total_files_processed as f64 / total_time.as_secs_f64(), result.nodes.len() as f64 / total_time.as_secs_f64() ); info!( " šŸ’Ŗ Core utilization: {:.1}%", tracker.core_utilization_percentage ); info!( " šŸŽÆ Languages processed: {}", tracker.languages_processed_simultaneously ); for (language, metrics) in &aggregator.language_metrics { info!( " {:?}: {} files, {:.1} files/s, {} nodes, {} edges", language, metrics.files_processed, metrics.files_per_second, metrics.nodes_extracted, metrics.edges_extracted ); } let sequential_estimate = aggregator .language_metrics .values() .map(|m| m.processing_time.as_secs_f64()) .sum::<f64>(); let parallel_efficiency = if total_time.as_secs_f64() > 0.0 { sequential_estimate / total_time.as_secs_f64() } else { 1.0 }; info!( "šŸš€ PARALLEL EFFICIENCY: {:.1}Ɨ speedup vs sequential processing", parallel_efficiency ); } } /// COMPLETE IMPLEMENTATION: Enhanced file collection with language-based grouping pub fn collect_files_by_language( root_path: &std::path::Path, config: &crate::file_collect::FileCollectionConfig, ) -> Result<HashMap<Language, Vec<PathBuf>>> { let all_files = crate::file_collect::collect_source_files_with_config(root_path, config)?; let mut files_by_language = HashMap::new(); for (file_path, _) in all_files { if let Some(language) = detect_language_from_path(&file_path) { files_by_language .entry(language) .or_insert_with(Vec::new) .push(file_path); } } info!("šŸ“Š LANGUAGE DISTRIBUTION:"); for (language, files) in &files_by_language { info!(" {:?}: {} files", language, files.len()); } Ok(files_by_language) } fn detect_language_from_path(path: &PathBuf) -> Option<Language> { let extension = path.extension()?.to_str()?.to_lowercase(); match extension.as_str() { "rs" => Some(Language::Rust), "ts" | "tsx" => Some(Language::TypeScript), "js" | "jsx" => Some(Language::JavaScript), "py" | "pyi" => Some(Language::Python), "go" => Some(Language::Go), "java" => Some(Language::Java), "cpp" | "cc" | "cxx" | "hpp" | "h" | "c" => Some(Language::Cpp), _ => None, } } static PARALLEL_PROCESSOR: std::sync::OnceLock<ParallelLanguageProcessor> = std::sync::OnceLock::new(); pub fn get_parallel_language_processor() -> &'static ParallelLanguageProcessor { PARALLEL_PROCESSOR.get_or_init(|| { info!("šŸš€ Initializing Global Parallel Language Processor"); ParallelLanguageProcessor::new() }) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server