Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
mmap.rs•16.5 kB
//! Memory-mapped file access patterns for zero-copy I/O //! //! This module provides safe abstractions over memory-mapped files for high-performance //! data access without copying. use crate::{ZeroCopyError, ZeroCopyResult}; use memmap2::{Advice, Mmap, MmapMut, MmapOptions}; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use rkyv::api::high::HighValidator; use rkyv::{access, access_unchecked, Archive}; use std::{ fs::{File, OpenOptions}, io::{Seek, SeekFrom}, path::{Path, PathBuf}, sync::Arc, }; use tracing::{debug, error, instrument, warn}; /// Memory-mapped file reader with zero-copy access pub struct MmapReader { mmap: Mmap, path: PathBuf, } impl MmapReader { /// Open a file for memory-mapped reading #[instrument(skip(path))] pub fn open<P: AsRef<Path>>(path: P) -> ZeroCopyResult<Self> { let path = path.as_ref().to_path_buf(); let file = File::open(&path)?; let mmap = unsafe { MmapOptions::new().map(&file)? }; debug!( "Opened memory-mapped file: {:?}, size: {} bytes", path, mmap.len() ); Ok(Self { mmap, path }) } /// Get the raw bytes of the memory-mapped file pub fn as_bytes(&self) -> &[u8] { &self.mmap } /// Get the size of the mapped file pub fn len(&self) -> usize { self.mmap.len() } /// Check if the mapped file is empty pub fn is_empty(&self) -> bool { self.mmap.is_empty() } /// Access archived data directly from the memory-mapped file #[instrument(skip(self))] pub fn access_archived<T>(&self) -> ZeroCopyResult<&T::Archived> where T: Archive, T::Archived: for<'a> bytecheck::CheckBytes<HighValidator<'a, rkyv::rancor::Failure>>, { access::<T::Archived, rkyv::rancor::Failure>(&self.mmap).map_err(|_e| { ZeroCopyError::ArchiveAccess("Failed to access archived data".to_string()) }) } /// Access archived data without validation (faster but unsafe) #[instrument(skip(self))] pub fn access_archived_unchecked<T>(&self) -> &T::Archived where T: Archive, T::Archived: rkyv::Portable, { unsafe { access_unchecked::<T::Archived>(&self.mmap) } } /// Get a slice of the mapped data pub fn slice(&self, start: usize, len: usize) -> ZeroCopyResult<&[u8]> { if start + len > self.mmap.len() { return Err(ZeroCopyError::Buffer(format!( "Slice bounds out of range: {}..{} > {}", start, start + len, self.mmap.len() ))); } Ok(&self.mmap[start..start + len]) } /// Advise the kernel about memory access patterns pub fn advise(&self, advice: Advice) -> ZeroCopyResult<()> { self.mmap.advise(advice)?; Ok(()) } /// Get the path of the mapped file pub fn path(&self) -> &Path { &self.path } } /// Memory-mapped file writer with zero-copy writes pub struct MmapWriter { mmap: MmapMut, path: PathBuf, file: File, } impl MmapWriter { /// Create a new memory-mapped file for writing #[instrument(skip(path))] pub fn create<P: AsRef<Path>>(path: P, size: usize) -> ZeroCopyResult<Self> { let path = path.as_ref().to_path_buf(); let file = OpenOptions::new() .read(true) .write(true) .create(true) .truncate(true) .open(&path)?; // Set the file size file.set_len(size as u64)?; let mmap = unsafe { MmapOptions::new().map_mut(&file)? }; debug!( "Created memory-mapped file: {:?}, size: {} bytes", path, size ); Ok(Self { mmap, path, file }) } /// Open an existing file for memory-mapped writing #[instrument(skip(path))] pub fn open<P: AsRef<Path>>(path: P) -> ZeroCopyResult<Self> { let path = path.as_ref().to_path_buf(); let file = OpenOptions::new().read(true).write(true).open(&path)?; let mmap = unsafe { MmapOptions::new().map_mut(&file)? }; debug!( "Opened memory-mapped file for writing: {:?}, size: {} bytes", path, mmap.len() ); Ok(Self { mmap, path, file }) } /// Get mutable access to the mapped bytes pub fn as_mut_bytes(&mut self) -> &mut [u8] { &mut self.mmap } /// Get the size of the mapped file pub fn len(&self) -> usize { self.mmap.len() } /// Check if the mapped file is empty pub fn is_empty(&self) -> bool { self.mmap.is_empty() } /// Write data at a specific offset pub fn write_at(&mut self, offset: usize, data: &[u8]) -> ZeroCopyResult<()> { if offset + data.len() > self.mmap.len() { return Err(ZeroCopyError::Buffer(format!( "Write bounds out of range: {}..{} > {}", offset, offset + data.len(), self.mmap.len() ))); } self.mmap[offset..offset + data.len()].copy_from_slice(data); Ok(()) } /// Flush changes to disk #[instrument(skip(self))] pub fn flush(&self) -> ZeroCopyResult<()> { self.mmap.flush()?; Ok(()) } /// Flush changes to disk asynchronously #[instrument(skip(self))] pub fn flush_async(&self) -> ZeroCopyResult<()> { self.mmap.flush_async()?; Ok(()) } /// Advise the kernel about memory access patterns pub fn advise(&self, advice: Advice) -> ZeroCopyResult<()> { self.mmap.advise(advice)?; Ok(()) } /// Get the path of the mapped file pub fn path(&self) -> &Path { &self.path } /// Resize the mapped file #[instrument(skip(self))] pub fn resize(&mut self, new_size: usize) -> ZeroCopyResult<()> { // Resize the underlying file self.file.set_len(new_size as u64)?; // Create a new mapping (old mapping will be dropped automatically) self.mmap = unsafe { MmapOptions::new().map_mut(&self.file)? }; debug!( "Resized memory-mapped file: {:?}, new size: {} bytes", self.path, new_size ); Ok(()) } } /// Thread-safe memory-mapped file with read-write locking pub struct ThreadSafeMmap { mmap: Arc<RwLock<MmapMut>>, path: PathBuf, } impl ThreadSafeMmap { /// Create a new thread-safe memory-mapped file #[instrument(skip(path))] pub fn create<P: AsRef<Path>>(path: P, size: usize) -> ZeroCopyResult<Self> { let path = path.as_ref().to_path_buf(); let file = OpenOptions::new() .read(true) .write(true) .create(true) .truncate(true) .open(&path)?; file.set_len(size as u64)?; let mmap = unsafe { MmapOptions::new().map_mut(&file)? }; Ok(Self { mmap: Arc::new(RwLock::new(mmap)), path, }) } /// Get a read lock on the mapped data pub fn read(&self) -> ThreadSafeMmapReadGuard { ThreadSafeMmapReadGuard { guard: self.mmap.read(), } } /// Get a write lock on the mapped data pub fn write(&self) -> ThreadSafeMmapWriteGuard { ThreadSafeMmapWriteGuard { guard: self.mmap.write(), } } /// Get the path of the mapped file pub fn path(&self) -> &Path { &self.path } } /// Read guard for thread-safe memory-mapped file pub struct ThreadSafeMmapReadGuard<'a> { guard: RwLockReadGuard<'a, MmapMut>, } impl<'a> ThreadSafeMmapReadGuard<'a> { /// Get the mapped bytes pub fn as_bytes(&self) -> &[u8] { &self.guard } /// Access archived data pub fn access_archived<T>(&self) -> ZeroCopyResult<&T::Archived> where T: Archive, T::Archived: for<'b> bytecheck::CheckBytes<HighValidator<'b, rkyv::rancor::Failure>>, { access::<T::Archived, rkyv::rancor::Failure>(&self.guard).map_err(|_e| { ZeroCopyError::ArchiveAccess("Failed to access archived data".to_string()) }) } } /// Write guard for thread-safe memory-mapped file pub struct ThreadSafeMmapWriteGuard<'a> { guard: RwLockWriteGuard<'a, MmapMut>, } impl<'a> ThreadSafeMmapWriteGuard<'a> { /// Get mutable access to the mapped bytes pub fn as_mut_bytes(&mut self) -> &mut [u8] { &mut self.guard } /// Write data at a specific offset pub fn write_at(&mut self, offset: usize, data: &[u8]) -> ZeroCopyResult<()> { if offset + data.len() > self.guard.len() { return Err(ZeroCopyError::Buffer(format!( "Write bounds out of range: {}..{} > {}", offset, offset + data.len(), self.guard.len() ))); } self.guard[offset..offset + data.len()].copy_from_slice(data); Ok(()) } /// Flush changes to disk pub fn flush(&self) -> ZeroCopyResult<()> { self.guard.flush()?; Ok(()) } } /// Memory-mapped circular buffer for streaming data pub struct MmapCircularBuffer { mmap: MmapMut, head: Arc<parking_lot::Mutex<usize>>, tail: Arc<parking_lot::Mutex<usize>>, capacity: usize, path: PathBuf, } impl MmapCircularBuffer { /// Create a new memory-mapped circular buffer #[instrument(skip(path))] pub fn create<P: AsRef<Path>>(path: P, capacity: usize) -> ZeroCopyResult<Self> { let path = path.as_ref().to_path_buf(); // Add space for metadata (head and tail pointers) let total_size = capacity + std::mem::size_of::<usize>() * 2; let file = OpenOptions::new() .read(true) .write(true) .create(true) .truncate(true) .open(&path)?; file.set_len(total_size as u64)?; let mmap = unsafe { MmapOptions::new().map_mut(&file)? }; Ok(Self { mmap, head: Arc::new(parking_lot::Mutex::new(0)), tail: Arc::new(parking_lot::Mutex::new(0)), capacity, path, }) } /// Write data to the circular buffer pub fn write(&mut self, data: &[u8]) -> ZeroCopyResult<usize> { let mut head = self.head.lock(); let tail = *self.tail.lock(); let available = if *head >= tail { self.capacity - *head + tail } else { tail - *head }; if available < data.len() { return Err(ZeroCopyError::Buffer( "Insufficient space in circular buffer".to_string(), )); } let metadata_offset = std::mem::size_of::<usize>() * 2; let write_size = data.len().min(available); // Handle wrap-around if *head + write_size <= self.capacity { // No wrap-around needed self.mmap[metadata_offset + *head..metadata_offset + *head + write_size] .copy_from_slice(&data[..write_size]); } else { // Split write due to wrap-around let first_part = self.capacity - *head; self.mmap[metadata_offset + *head..metadata_offset + self.capacity] .copy_from_slice(&data[..first_part]); self.mmap[metadata_offset..metadata_offset + write_size - first_part] .copy_from_slice(&data[first_part..write_size]); } *head = (*head + write_size) % self.capacity; Ok(write_size) } /// Read data from the circular buffer pub fn read(&mut self, buffer: &mut [u8]) -> ZeroCopyResult<usize> { let head = *self.head.lock(); let mut tail = self.tail.lock(); let available = if head >= *tail { head - *tail } else { self.capacity - *tail + head }; if available == 0 { return Ok(0); } let metadata_offset = std::mem::size_of::<usize>() * 2; let read_size = buffer.len().min(available); // Handle wrap-around if *tail + read_size <= self.capacity { // No wrap-around needed buffer[..read_size].copy_from_slice( &self.mmap[metadata_offset + *tail..metadata_offset + *tail + read_size], ); } else { // Split read due to wrap-around let first_part = self.capacity - *tail; buffer[..first_part].copy_from_slice( &self.mmap[metadata_offset + *tail..metadata_offset + self.capacity], ); buffer[first_part..read_size].copy_from_slice( &self.mmap[metadata_offset..metadata_offset + read_size - first_part], ); } *tail = (*tail + read_size) % self.capacity; Ok(read_size) } /// Get the amount of data available to read pub fn available(&self) -> usize { let head = *self.head.lock(); let tail = *self.tail.lock(); if head >= tail { head - tail } else { self.capacity - tail + head } } /// Get the amount of space available for writing pub fn space_available(&self) -> usize { self.capacity - self.available() } } #[cfg(test)] mod tests { use super::*; use rkyv::{Archive, Deserialize, Serialize}; use tempfile::NamedTempFile; #[derive(Archive, Serialize, Deserialize, Debug, PartialEq)] struct TestData { id: u64, name: String, values: Vec<i32>, } #[test] fn test_mmap_reader() { let temp_file = NamedTempFile::new().unwrap(); let path = temp_file.path(); // Write test data let data = TestData { id: 42, name: "test".to_string(), values: vec![1, 2, 3, 4, 5], }; let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&data).unwrap(); std::fs::write(path, &bytes).unwrap(); // Test memory-mapped reading let reader = MmapReader::open(path).unwrap(); assert_eq!(reader.len(), bytes.len()); assert!(!reader.is_empty()); let archived = reader.access_archived::<TestData>().unwrap(); assert_eq!(archived.id, 42); assert_eq!(archived.name, "test"); } #[test] fn test_mmap_writer() { let temp_file = NamedTempFile::new().unwrap(); let path = temp_file.path(); let test_data = b"Hello, memory-mapped world!"; // Create and write to memory-mapped file let mut writer = MmapWriter::create(path, test_data.len()).unwrap(); writer.write_at(0, test_data).unwrap(); writer.flush().unwrap(); // Verify the data was written let content = std::fs::read(path).unwrap(); assert_eq!(content, test_data); } #[test] fn test_thread_safe_mmap() { let temp_file = NamedTempFile::new().unwrap(); let path = temp_file.path(); let mmap = ThreadSafeMmap::create(path, 1024).unwrap(); // Test concurrent access { let mut write_guard = mmap.write(); let test_data = b"thread-safe test"; write_guard.write_at(0, test_data).unwrap(); write_guard.flush().unwrap(); } { let read_guard = mmap.read(); let bytes = read_guard.as_bytes(); assert_eq!(&bytes[..16], b"thread-safe test"); } } #[test] fn test_mmap_circular_buffer() { let temp_file = NamedTempFile::new().unwrap(); let path = temp_file.path(); let mut buffer = MmapCircularBuffer::create(path, 10).unwrap(); // Test writing and reading let write_data = b"hello"; let written = buffer.write(write_data).unwrap(); assert_eq!(written, 5); let mut read_buffer = [0u8; 10]; let read = buffer.read(&mut read_buffer).unwrap(); assert_eq!(read, 5); assert_eq!(&read_buffer[..5], b"hello"); // Test wrap-around let write_data2 = b"worldtest"; let written2 = buffer.write(write_data2).unwrap(); assert_eq!(written2, 9); let read2 = buffer.read(&mut read_buffer).unwrap(); assert_eq!(read2, 9); assert_eq!(&read_buffer[..9], b"worldtest"); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server