Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
optimized_io.rs•18.7 kB
use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{RwLock, Mutex, Semaphore}; use tokio::time::timeout; use parking_lot::RwLock as SyncRwLock; use crossbeam_channel::{bounded, Receiver, Sender}; use crate::{CacheKey, CacheEntry, CacheStats}; use codegraph_core::{NodeId, CodeGraphError, Result}; /// High-performance I/O optimization strategies for 3x throughput improvement pub struct OptimizedIOManager { /// Batched read operations to reduce I/O overhead batch_reader: Arc<BatchedIOReader>, /// Intelligent write buffering and flushing write_buffer: Arc<BufferedWriter>, /// Predictive prefetching based on access patterns prefetch_engine: Arc<PrefetchEngine>, /// Compressed storage for large data compression_layer: Arc<CompressionLayer>, /// Performance metrics and monitoring io_metrics: Arc<SyncRwLock<IOMetrics>>, } #[derive(Debug, Default)] pub struct IOMetrics { pub total_reads: u64, pub total_writes: u64, pub cache_hits: u64, pub cache_misses: u64, pub bytes_read: u64, pub bytes_written: u64, pub avg_read_latency_ms: f64, pub avg_write_latency_ms: f64, pub compression_ratio: f64, pub prefetch_accuracy: f64, } impl OptimizedIOManager { pub fn new(config: IOConfig) -> Self { let batch_reader = Arc::new(BatchedIOReader::new(config.batch_size)); let write_buffer = Arc::new(BufferedWriter::new(config.buffer_size, config.flush_interval)); let prefetch_engine = Arc::new(PrefetchEngine::new(config.prefetch_depth, config.pattern_history)); let compression_layer = Arc::new(CompressionLayer::new(config.compression_threshold)); Self { batch_reader, write_buffer, prefetch_engine, compression_layer, io_metrics: Arc::new(SyncRwLock::new(IOMetrics::default())), } } pub async fn optimized_read<T>(&self, keys: Vec<CacheKey>) -> Result<Vec<Option<T>>> where T: serde::de::DeserializeOwned + Send + 'static, { let start = Instant::now(); // 1. Check prefetch cache first let (prefetch_hits, remaining_keys) = self.prefetch_engine.check_prefetched(&keys).await; // 2. Batch read remaining keys let batch_results = if !remaining_keys.is_empty() { self.batch_reader.batch_read(remaining_keys).await? } else { Vec::new() }; // 3. Merge prefetch hits with batch results let merged_results = self.merge_results(prefetch_hits, batch_results); // 4. Update metrics let elapsed = start.elapsed(); self.update_read_metrics(keys.len(), merged_results.len(), elapsed).await; // 5. Update access patterns for future prefetching self.prefetch_engine.record_access_pattern(&keys).await; Ok(merged_results) } pub async fn optimized_write<T>(&self, data: Vec<(CacheKey, T)>) -> Result<()> where T: serde::Serialize + Send + 'static, { let start = Instant::now(); // 1. Compress data if beneficial let compressed_data = self.compression_layer.compress_batch(&data).await?; // 2. Buffer writes for efficient batching self.write_buffer.buffer_writes(compressed_data).await?; // 3. Update metrics let elapsed = start.elapsed(); self.update_write_metrics(data.len(), elapsed).await; Ok(()) } pub async fn get_metrics(&self) -> IOMetrics { self.io_metrics.read().clone() } async fn merge_results<T>(&self, prefetch_hits: Vec<Option<T>>, batch_results: Vec<Option<T>>) -> Vec<Option<T>> { // Efficient merge logic here let mut merged = Vec::with_capacity(prefetch_hits.len() + batch_results.len()); merged.extend(prefetch_hits); merged.extend(batch_results); merged } async fn update_read_metrics(&self, requested: usize, found: usize, latency: Duration) { let mut metrics = self.io_metrics.write(); metrics.total_reads += requested as u64; metrics.cache_hits += found as u64; metrics.cache_misses += (requested - found) as u64; let latency_ms = latency.as_secs_f64() * 1000.0; metrics.avg_read_latency_ms = if metrics.total_reads == 1 { latency_ms } else { (metrics.avg_read_latency_ms + latency_ms) / 2.0 }; } async fn update_write_metrics(&self, count: usize, latency: Duration) { let mut metrics = self.io_metrics.write(); metrics.total_writes += count as u64; let latency_ms = latency.as_secs_f64() * 1000.0; metrics.avg_write_latency_ms = if metrics.total_writes == 1 { latency_ms } else { (metrics.avg_write_latency_ms + latency_ms) / 2.0 }; } } /// Batched I/O reader for reducing system call overhead pub struct BatchedIOReader { batch_size: usize, pending_reads: Arc<Mutex<VecDeque<ReadRequest>>>, batch_processor: Arc<Semaphore>, } #[derive(Debug)] struct ReadRequest { key: CacheKey, response_sender: tokio::sync::oneshot::Sender<Option<Vec<u8>>>, } impl BatchedIOReader { pub fn new(batch_size: usize) -> Self { Self { batch_size, pending_reads: Arc::new(Mutex::new(VecDeque::new())), batch_processor: Arc::new(Semaphore::new(1)), } } pub async fn batch_read<T>(&self, keys: Vec<CacheKey>) -> Result<Vec<Option<T>>> where T: serde::de::DeserializeOwned, { let mut results = Vec::with_capacity(keys.len()); let mut receivers = Vec::new(); // Queue read requests { let mut pending = self.pending_reads.lock().await; for key in keys { let (sender, receiver) = tokio::sync::oneshot::channel(); pending.push_back(ReadRequest { key, response_sender: sender, }); receivers.push(receiver); } } // Process batches if threshold reached if self.should_process_batch().await { self.process_batch().await?; } // Collect results for receiver in receivers { match timeout(Duration::from_millis(100), receiver).await { Ok(Ok(Some(data))) => { let deserialized: T = bincode::deserialize(&data) .map_err(|e| CodeGraphError::Serialization(e.to_string()))?; results.push(Some(deserialized)); } _ => results.push(None), } } Ok(results) } async fn should_process_batch(&self) -> bool { let pending = self.pending_reads.lock().await; pending.len() >= self.batch_size } async fn process_batch(&self) -> Result<()> { let _permit = self.batch_processor.acquire().await .map_err(|_| CodeGraphError::Concurrency("Failed to acquire batch processor".into()))?; let mut batch = Vec::new(); // Extract batch { let mut pending = self.pending_reads.lock().await; for _ in 0..self.batch_size.min(pending.len()) { if let Some(request) = pending.pop_front() { batch.push(request); } } } if batch.is_empty() { return Ok(()); } // Simulate batched I/O operation (replace with actual implementation) let results = self.perform_batched_io(&batch).await?; // Send results back for (request, result) in batch.into_iter().zip(results) { let _ = request.response_sender.send(result); } Ok(()) } async fn perform_batched_io(&self, batch: &[ReadRequest]) -> Result<Vec<Option<Vec<u8>>>> { // Placeholder for actual I/O implementation // In practice, this would interface with RocksDB, FAISS, or other storage let mut results = Vec::with_capacity(batch.len()); for request in batch { // Simulate I/O delay and data retrieval tokio::time::sleep(Duration::from_micros(10)).await; // Mock data based on key hash (replace with real I/O) let data = format!("data_for_key_{}", request.key.hash); results.push(Some(data.into_bytes())); } Ok(results) } } /// Buffered writer with intelligent flushing strategies pub struct BufferedWriter { buffer_size: usize, flush_interval: Duration, write_buffer: Arc<Mutex<Vec<(CacheKey, Vec<u8>)>>>, flush_trigger: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>, } impl BufferedWriter { pub fn new(buffer_size: usize, flush_interval: Duration) -> Self { Self { buffer_size, flush_interval, write_buffer: Arc::new(Mutex::new(Vec::new())), flush_trigger: Arc::new(Mutex::new(None)), } } pub async fn buffer_writes<T>(&self, data: Vec<(CacheKey, T)>) -> Result<()> where T: serde::Serialize, { let mut buffer = self.write_buffer.lock().await; for (key, value) in data { let serialized = bincode::serialize(&value) .map_err(|e| CodeGraphError::Serialization(e.to_string()))?; buffer.push((key, serialized)); } // Trigger flush if buffer is full if buffer.len() >= self.buffer_size { drop(buffer); self.flush_buffer().await?; } else if buffer.len() == 1 { // Start flush timer for first write self.start_flush_timer().await; } Ok(()) } async fn start_flush_timer(&self) { let mut flush_trigger = self.flush_trigger.lock().await; if flush_trigger.is_none() { let buffer = Arc::clone(&self.write_buffer); let interval = self.flush_interval; let handle = tokio::spawn(async move { tokio::time::sleep(interval).await; // Flush the buffer after timeout let mut buf = buffer.lock().await; if !buf.is_empty() { // Perform actual flush operation buf.clear(); } }); *flush_trigger = Some(handle); } } async fn flush_buffer(&self) -> Result<()> { let mut buffer = self.write_buffer.lock().await; if buffer.is_empty() { return Ok(()); } // Perform batched write operation let batch_data = buffer.drain(..).collect::<Vec<_>>(); drop(buffer); // Simulate batched I/O write self.perform_batched_write(batch_data).await?; // Clear flush timer let mut flush_trigger = self.flush_trigger.lock().await; if let Some(handle) = flush_trigger.take() { handle.abort(); } Ok(()) } async fn perform_batched_write(&self, data: Vec<(CacheKey, Vec<u8>)>) -> Result<()> { // Placeholder for actual batched write implementation tokio::time::sleep(Duration::from_micros(data.len() as u64)).await; Ok(()) } } /// Intelligent prefetching engine based on access pattern prediction pub struct PrefetchEngine { prefetch_depth: usize, pattern_history_size: usize, access_patterns: Arc<RwLock<HashMap<CacheKey, Vec<CacheKey>>>>, prefetch_cache: Arc<RwLock<HashMap<CacheKey, Vec<u8>>>>, prediction_accuracy: Arc<SyncRwLock<PredictionMetrics>>, } #[derive(Debug, Default)] struct PredictionMetrics { predictions_made: u64, predictions_hit: u64, accuracy_percentage: f64, } impl PrefetchEngine { pub fn new(prefetch_depth: usize, pattern_history_size: usize) -> Self { Self { prefetch_depth, pattern_history_size, access_patterns: Arc::new(RwLock::new(HashMap::new())), prefetch_cache: Arc::new(RwLock::new(HashMap::new())), prediction_accuracy: Arc::new(SyncRwLock::new(PredictionMetrics::default())), } } pub async fn record_access_pattern(&self, keys: &[CacheKey]) { if keys.len() < 2 { return; } let mut patterns = self.access_patterns.write().await; for window in keys.windows(2) { let current = window[0]; let next = window[1]; patterns.entry(current) .or_insert_with(Vec::new) .push(next); // Maintain history size limit if let Some(history) = patterns.get_mut(&current) { if history.len() > self.pattern_history_size { history.remove(0); } } } } pub async fn check_prefetched<T>(&self, keys: &[CacheKey]) -> (Vec<Option<T>>, Vec<CacheKey>) where T: serde::de::DeserializeOwned, { let prefetch_cache = self.prefetch_cache.read().await; let mut hits = Vec::new(); let mut misses = Vec::new(); for &key in keys { if let Some(data) = prefetch_cache.get(&key) { match bincode::deserialize(data) { Ok(value) => { hits.push(Some(value)); self.record_prediction_hit().await; } Err(_) => { hits.push(None); misses.push(key); } } } else { hits.push(None); misses.push(key); } } (hits, misses) } async fn record_prediction_hit(&self) { let mut metrics = self.prediction_accuracy.write(); metrics.predictions_hit += 1; metrics.accuracy_percentage = if metrics.predictions_made > 0 { (metrics.predictions_hit as f64 / metrics.predictions_made as f64) * 100.0 } else { 0.0 }; } pub async fn prefetch_predicted_keys(&self, current_key: CacheKey) -> Result<()> { let patterns = self.access_patterns.read().await; if let Some(next_keys) = patterns.get(&current_key) { let keys_to_prefetch: Vec<_> = next_keys.iter() .take(self.prefetch_depth) .cloned() .collect(); drop(patterns); // Perform prefetch operation in background let prefetch_cache = Arc::clone(&self.prefetch_cache); let prediction_accuracy = Arc::clone(&self.prediction_accuracy); tokio::spawn(async move { for key in keys_to_prefetch { // Simulate prefetch I/O let data = format!("prefetched_data_for_{}", key.hash).into_bytes(); let mut cache = prefetch_cache.write().await; cache.insert(key, data); let mut metrics = prediction_accuracy.write(); metrics.predictions_made += 1; } }); } Ok(()) } } /// Compression layer for reducing I/O bandwidth requirements pub struct CompressionLayer { compression_threshold: usize, } impl CompressionLayer { pub fn new(compression_threshold: usize) -> Self { Self { compression_threshold, } } pub async fn compress_batch<T>(&self, data: &[(CacheKey, T)]) -> Result<Vec<(CacheKey, Vec<u8>)>> where T: serde::Serialize, { let mut results = Vec::with_capacity(data.len()); for (key, value) in data { let serialized = bincode::serialize(value) .map_err(|e| CodeGraphError::Serialization(e.to_string()))?; let final_data = if serialized.len() > self.compression_threshold { self.compress_data(&serialized).await? } else { serialized }; results.push((*key, final_data)); } Ok(results) } async fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> { // Simplified compression using flate2 (gzip) use flate2::{Compression, write::GzEncoder}; use std::io::Write; let compressed = tokio::task::spawn_blocking({ let data = data.to_vec(); move || { let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); encoder.write_all(&data)?; encoder.finish() } }).await .map_err(|e| CodeGraphError::Compression(e.to_string()))? .map_err(|e| CodeGraphError::Compression(e.to_string()))?; Ok(compressed) } pub async fn decompress_data(&self, compressed: &[u8]) -> Result<Vec<u8>> { use flate2::read::GzDecoder; use std::io::Read; let decompressed = tokio::task::spawn_blocking({ let compressed = compressed.to_vec(); move || { let mut decoder = GzDecoder::new(&compressed[..]); let mut decompressed = Vec::new(); decoder.read_to_end(&mut decompressed)?; Ok::<Vec<u8>, std::io::Error>(decompressed) } }).await .map_err(|e| CodeGraphError::Compression(e.to_string()))? .map_err(|e| CodeGraphError::Compression(e.to_string()))?; Ok(decompressed) } } #[derive(Debug, Clone)] pub struct IOConfig { pub batch_size: usize, pub buffer_size: usize, pub flush_interval: Duration, pub prefetch_depth: usize, pub pattern_history: usize, pub compression_threshold: usize, } impl Default for IOConfig { fn default() -> Self { Self { batch_size: 100, // Batch up to 100 operations buffer_size: 1000, // Buffer up to 1000 writes flush_interval: Duration::from_millis(50), // Flush every 50ms prefetch_depth: 10, // Prefetch up to 10 items pattern_history: 100, // Remember 100 access patterns compression_threshold: 1024, // Compress data > 1KB } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server