optimized_io.rsā¢18.7 kB
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{RwLock, Mutex, Semaphore};
use tokio::time::timeout;
use parking_lot::RwLock as SyncRwLock;
use crossbeam_channel::{bounded, Receiver, Sender};
use crate::{CacheKey, CacheEntry, CacheStats};
use codegraph_core::{NodeId, CodeGraphError, Result};
/// High-performance I/O optimization strategies for 3x throughput improvement
pub struct OptimizedIOManager {
/// Batched read operations to reduce I/O overhead
batch_reader: Arc<BatchedIOReader>,
/// Intelligent write buffering and flushing
write_buffer: Arc<BufferedWriter>,
/// Predictive prefetching based on access patterns
prefetch_engine: Arc<PrefetchEngine>,
/// Compressed storage for large data
compression_layer: Arc<CompressionLayer>,
/// Performance metrics and monitoring
io_metrics: Arc<SyncRwLock<IOMetrics>>,
}
#[derive(Debug, Default)]
pub struct IOMetrics {
pub total_reads: u64,
pub total_writes: u64,
pub cache_hits: u64,
pub cache_misses: u64,
pub bytes_read: u64,
pub bytes_written: u64,
pub avg_read_latency_ms: f64,
pub avg_write_latency_ms: f64,
pub compression_ratio: f64,
pub prefetch_accuracy: f64,
}
impl OptimizedIOManager {
pub fn new(config: IOConfig) -> Self {
let batch_reader = Arc::new(BatchedIOReader::new(config.batch_size));
let write_buffer = Arc::new(BufferedWriter::new(config.buffer_size, config.flush_interval));
let prefetch_engine = Arc::new(PrefetchEngine::new(config.prefetch_depth, config.pattern_history));
let compression_layer = Arc::new(CompressionLayer::new(config.compression_threshold));
Self {
batch_reader,
write_buffer,
prefetch_engine,
compression_layer,
io_metrics: Arc::new(SyncRwLock::new(IOMetrics::default())),
}
}
pub async fn optimized_read<T>(&self, keys: Vec<CacheKey>) -> Result<Vec<Option<T>>>
where
T: serde::de::DeserializeOwned + Send + 'static,
{
let start = Instant::now();
// 1. Check prefetch cache first
let (prefetch_hits, remaining_keys) = self.prefetch_engine.check_prefetched(&keys).await;
// 2. Batch read remaining keys
let batch_results = if !remaining_keys.is_empty() {
self.batch_reader.batch_read(remaining_keys).await?
} else {
Vec::new()
};
// 3. Merge prefetch hits with batch results
let merged_results = self.merge_results(prefetch_hits, batch_results);
// 4. Update metrics
let elapsed = start.elapsed();
self.update_read_metrics(keys.len(), merged_results.len(), elapsed).await;
// 5. Update access patterns for future prefetching
self.prefetch_engine.record_access_pattern(&keys).await;
Ok(merged_results)
}
pub async fn optimized_write<T>(&self, data: Vec<(CacheKey, T)>) -> Result<()>
where
T: serde::Serialize + Send + 'static,
{
let start = Instant::now();
// 1. Compress data if beneficial
let compressed_data = self.compression_layer.compress_batch(&data).await?;
// 2. Buffer writes for efficient batching
self.write_buffer.buffer_writes(compressed_data).await?;
// 3. Update metrics
let elapsed = start.elapsed();
self.update_write_metrics(data.len(), elapsed).await;
Ok(())
}
pub async fn get_metrics(&self) -> IOMetrics {
self.io_metrics.read().clone()
}
async fn merge_results<T>(&self, prefetch_hits: Vec<Option<T>>, batch_results: Vec<Option<T>>) -> Vec<Option<T>> {
// Efficient merge logic here
let mut merged = Vec::with_capacity(prefetch_hits.len() + batch_results.len());
merged.extend(prefetch_hits);
merged.extend(batch_results);
merged
}
async fn update_read_metrics(&self, requested: usize, found: usize, latency: Duration) {
let mut metrics = self.io_metrics.write();
metrics.total_reads += requested as u64;
metrics.cache_hits += found as u64;
metrics.cache_misses += (requested - found) as u64;
let latency_ms = latency.as_secs_f64() * 1000.0;
metrics.avg_read_latency_ms = if metrics.total_reads == 1 {
latency_ms
} else {
(metrics.avg_read_latency_ms + latency_ms) / 2.0
};
}
async fn update_write_metrics(&self, count: usize, latency: Duration) {
let mut metrics = self.io_metrics.write();
metrics.total_writes += count as u64;
let latency_ms = latency.as_secs_f64() * 1000.0;
metrics.avg_write_latency_ms = if metrics.total_writes == 1 {
latency_ms
} else {
(metrics.avg_write_latency_ms + latency_ms) / 2.0
};
}
}
/// Batched I/O reader for reducing system call overhead
pub struct BatchedIOReader {
batch_size: usize,
pending_reads: Arc<Mutex<VecDeque<ReadRequest>>>,
batch_processor: Arc<Semaphore>,
}
#[derive(Debug)]
struct ReadRequest {
key: CacheKey,
response_sender: tokio::sync::oneshot::Sender<Option<Vec<u8>>>,
}
impl BatchedIOReader {
pub fn new(batch_size: usize) -> Self {
Self {
batch_size,
pending_reads: Arc::new(Mutex::new(VecDeque::new())),
batch_processor: Arc::new(Semaphore::new(1)),
}
}
pub async fn batch_read<T>(&self, keys: Vec<CacheKey>) -> Result<Vec<Option<T>>>
where
T: serde::de::DeserializeOwned,
{
let mut results = Vec::with_capacity(keys.len());
let mut receivers = Vec::new();
// Queue read requests
{
let mut pending = self.pending_reads.lock().await;
for key in keys {
let (sender, receiver) = tokio::sync::oneshot::channel();
pending.push_back(ReadRequest {
key,
response_sender: sender,
});
receivers.push(receiver);
}
}
// Process batches if threshold reached
if self.should_process_batch().await {
self.process_batch().await?;
}
// Collect results
for receiver in receivers {
match timeout(Duration::from_millis(100), receiver).await {
Ok(Ok(Some(data))) => {
let deserialized: T = bincode::deserialize(&data)
.map_err(|e| CodeGraphError::Serialization(e.to_string()))?;
results.push(Some(deserialized));
}
_ => results.push(None),
}
}
Ok(results)
}
async fn should_process_batch(&self) -> bool {
let pending = self.pending_reads.lock().await;
pending.len() >= self.batch_size
}
async fn process_batch(&self) -> Result<()> {
let _permit = self.batch_processor.acquire().await
.map_err(|_| CodeGraphError::Concurrency("Failed to acquire batch processor".into()))?;
let mut batch = Vec::new();
// Extract batch
{
let mut pending = self.pending_reads.lock().await;
for _ in 0..self.batch_size.min(pending.len()) {
if let Some(request) = pending.pop_front() {
batch.push(request);
}
}
}
if batch.is_empty() {
return Ok(());
}
// Simulate batched I/O operation (replace with actual implementation)
let results = self.perform_batched_io(&batch).await?;
// Send results back
for (request, result) in batch.into_iter().zip(results) {
let _ = request.response_sender.send(result);
}
Ok(())
}
async fn perform_batched_io(&self, batch: &[ReadRequest]) -> Result<Vec<Option<Vec<u8>>>> {
// Placeholder for actual I/O implementation
// In practice, this would interface with RocksDB, FAISS, or other storage
let mut results = Vec::with_capacity(batch.len());
for request in batch {
// Simulate I/O delay and data retrieval
tokio::time::sleep(Duration::from_micros(10)).await;
// Mock data based on key hash (replace with real I/O)
let data = format!("data_for_key_{}", request.key.hash);
results.push(Some(data.into_bytes()));
}
Ok(results)
}
}
/// Buffered writer with intelligent flushing strategies
pub struct BufferedWriter {
buffer_size: usize,
flush_interval: Duration,
write_buffer: Arc<Mutex<Vec<(CacheKey, Vec<u8>)>>>,
flush_trigger: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
}
impl BufferedWriter {
pub fn new(buffer_size: usize, flush_interval: Duration) -> Self {
Self {
buffer_size,
flush_interval,
write_buffer: Arc::new(Mutex::new(Vec::new())),
flush_trigger: Arc::new(Mutex::new(None)),
}
}
pub async fn buffer_writes<T>(&self, data: Vec<(CacheKey, T)>) -> Result<()>
where
T: serde::Serialize,
{
let mut buffer = self.write_buffer.lock().await;
for (key, value) in data {
let serialized = bincode::serialize(&value)
.map_err(|e| CodeGraphError::Serialization(e.to_string()))?;
buffer.push((key, serialized));
}
// Trigger flush if buffer is full
if buffer.len() >= self.buffer_size {
drop(buffer);
self.flush_buffer().await?;
} else if buffer.len() == 1 {
// Start flush timer for first write
self.start_flush_timer().await;
}
Ok(())
}
async fn start_flush_timer(&self) {
let mut flush_trigger = self.flush_trigger.lock().await;
if flush_trigger.is_none() {
let buffer = Arc::clone(&self.write_buffer);
let interval = self.flush_interval;
let handle = tokio::spawn(async move {
tokio::time::sleep(interval).await;
// Flush the buffer after timeout
let mut buf = buffer.lock().await;
if !buf.is_empty() {
// Perform actual flush operation
buf.clear();
}
});
*flush_trigger = Some(handle);
}
}
async fn flush_buffer(&self) -> Result<()> {
let mut buffer = self.write_buffer.lock().await;
if buffer.is_empty() {
return Ok(());
}
// Perform batched write operation
let batch_data = buffer.drain(..).collect::<Vec<_>>();
drop(buffer);
// Simulate batched I/O write
self.perform_batched_write(batch_data).await?;
// Clear flush timer
let mut flush_trigger = self.flush_trigger.lock().await;
if let Some(handle) = flush_trigger.take() {
handle.abort();
}
Ok(())
}
async fn perform_batched_write(&self, data: Vec<(CacheKey, Vec<u8>)>) -> Result<()> {
// Placeholder for actual batched write implementation
tokio::time::sleep(Duration::from_micros(data.len() as u64)).await;
Ok(())
}
}
/// Intelligent prefetching engine based on access pattern prediction
pub struct PrefetchEngine {
prefetch_depth: usize,
pattern_history_size: usize,
access_patterns: Arc<RwLock<HashMap<CacheKey, Vec<CacheKey>>>>,
prefetch_cache: Arc<RwLock<HashMap<CacheKey, Vec<u8>>>>,
prediction_accuracy: Arc<SyncRwLock<PredictionMetrics>>,
}
#[derive(Debug, Default)]
struct PredictionMetrics {
predictions_made: u64,
predictions_hit: u64,
accuracy_percentage: f64,
}
impl PrefetchEngine {
pub fn new(prefetch_depth: usize, pattern_history_size: usize) -> Self {
Self {
prefetch_depth,
pattern_history_size,
access_patterns: Arc::new(RwLock::new(HashMap::new())),
prefetch_cache: Arc::new(RwLock::new(HashMap::new())),
prediction_accuracy: Arc::new(SyncRwLock::new(PredictionMetrics::default())),
}
}
pub async fn record_access_pattern(&self, keys: &[CacheKey]) {
if keys.len() < 2 {
return;
}
let mut patterns = self.access_patterns.write().await;
for window in keys.windows(2) {
let current = window[0];
let next = window[1];
patterns.entry(current)
.or_insert_with(Vec::new)
.push(next);
// Maintain history size limit
if let Some(history) = patterns.get_mut(¤t) {
if history.len() > self.pattern_history_size {
history.remove(0);
}
}
}
}
pub async fn check_prefetched<T>(&self, keys: &[CacheKey]) -> (Vec<Option<T>>, Vec<CacheKey>)
where
T: serde::de::DeserializeOwned,
{
let prefetch_cache = self.prefetch_cache.read().await;
let mut hits = Vec::new();
let mut misses = Vec::new();
for &key in keys {
if let Some(data) = prefetch_cache.get(&key) {
match bincode::deserialize(data) {
Ok(value) => {
hits.push(Some(value));
self.record_prediction_hit().await;
}
Err(_) => {
hits.push(None);
misses.push(key);
}
}
} else {
hits.push(None);
misses.push(key);
}
}
(hits, misses)
}
async fn record_prediction_hit(&self) {
let mut metrics = self.prediction_accuracy.write();
metrics.predictions_hit += 1;
metrics.accuracy_percentage = if metrics.predictions_made > 0 {
(metrics.predictions_hit as f64 / metrics.predictions_made as f64) * 100.0
} else {
0.0
};
}
pub async fn prefetch_predicted_keys(&self, current_key: CacheKey) -> Result<()> {
let patterns = self.access_patterns.read().await;
if let Some(next_keys) = patterns.get(¤t_key) {
let keys_to_prefetch: Vec<_> = next_keys.iter()
.take(self.prefetch_depth)
.cloned()
.collect();
drop(patterns);
// Perform prefetch operation in background
let prefetch_cache = Arc::clone(&self.prefetch_cache);
let prediction_accuracy = Arc::clone(&self.prediction_accuracy);
tokio::spawn(async move {
for key in keys_to_prefetch {
// Simulate prefetch I/O
let data = format!("prefetched_data_for_{}", key.hash).into_bytes();
let mut cache = prefetch_cache.write().await;
cache.insert(key, data);
let mut metrics = prediction_accuracy.write();
metrics.predictions_made += 1;
}
});
}
Ok(())
}
}
/// Compression layer for reducing I/O bandwidth requirements
pub struct CompressionLayer {
compression_threshold: usize,
}
impl CompressionLayer {
pub fn new(compression_threshold: usize) -> Self {
Self {
compression_threshold,
}
}
pub async fn compress_batch<T>(&self, data: &[(CacheKey, T)]) -> Result<Vec<(CacheKey, Vec<u8>)>>
where
T: serde::Serialize,
{
let mut results = Vec::with_capacity(data.len());
for (key, value) in data {
let serialized = bincode::serialize(value)
.map_err(|e| CodeGraphError::Serialization(e.to_string()))?;
let final_data = if serialized.len() > self.compression_threshold {
self.compress_data(&serialized).await?
} else {
serialized
};
results.push((*key, final_data));
}
Ok(results)
}
async fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
// Simplified compression using flate2 (gzip)
use flate2::{Compression, write::GzEncoder};
use std::io::Write;
let compressed = tokio::task::spawn_blocking({
let data = data.to_vec();
move || {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&data)?;
encoder.finish()
}
}).await
.map_err(|e| CodeGraphError::Compression(e.to_string()))?
.map_err(|e| CodeGraphError::Compression(e.to_string()))?;
Ok(compressed)
}
pub async fn decompress_data(&self, compressed: &[u8]) -> Result<Vec<u8>> {
use flate2::read::GzDecoder;
use std::io::Read;
let decompressed = tokio::task::spawn_blocking({
let compressed = compressed.to_vec();
move || {
let mut decoder = GzDecoder::new(&compressed[..]);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)?;
Ok::<Vec<u8>, std::io::Error>(decompressed)
}
}).await
.map_err(|e| CodeGraphError::Compression(e.to_string()))?
.map_err(|e| CodeGraphError::Compression(e.to_string()))?;
Ok(decompressed)
}
}
#[derive(Debug, Clone)]
pub struct IOConfig {
pub batch_size: usize,
pub buffer_size: usize,
pub flush_interval: Duration,
pub prefetch_depth: usize,
pub pattern_history: usize,
pub compression_threshold: usize,
}
impl Default for IOConfig {
fn default() -> Self {
Self {
batch_size: 100, // Batch up to 100 operations
buffer_size: 1000, // Buffer up to 1000 writes
flush_interval: Duration::from_millis(50), // Flush every 50ms
prefetch_depth: 10, // Prefetch up to 10 items
pattern_history: 100, // Remember 100 access patterns
compression_threshold: 1024, // Compress data > 1KB
}
}
}