Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
serialization.rs10.3 kB
//! Zero-copy serialization patterns using rkyv //! //! This module provides high-performance serialization without data copying. use crate::{ZeroCopyError, ZeroCopyResult}; use bytes::{Bytes, BytesMut}; use rkyv::api::high::{HighDeserializer, HighSerializer, HighValidator}; use rkyv::ser::allocator::ArenaHandle; use rkyv::util::AlignedVec; use rkyv::{ access, access_unchecked, deserialize, from_bytes, from_bytes_unchecked, rancor::Failure, to_bytes, Archive, Deserialize, Serialize, }; use std::sync::Arc; use tracing::{debug, instrument, trace}; /// A zero-copy serializer that reuses buffers #[derive(Debug)] pub struct ZeroCopySerializer { buffer: BytesMut, alignment: usize, } impl ZeroCopySerializer { /// Create a new serializer with default buffer capacity pub fn new() -> Self { Self::with_capacity(4096) } /// Create a new serializer with specified capacity pub fn with_capacity(capacity: usize) -> Self { Self { buffer: BytesMut::with_capacity(capacity), alignment: crate::constants::DEFAULT_ALIGNMENT, } } /// Serialize data to bytes without copying #[instrument(skip(self, data))] pub fn serialize( &mut self, data: &impl for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, Failure>>, ) -> ZeroCopyResult<Bytes> { self.buffer.clear(); let bytes = to_bytes::<Failure>(data).map_err(ZeroCopyError::Serialization)?; // Ensure proper alignment let aligned_len = align_up(bytes.len(), self.alignment); self.buffer.extend_from_slice(&bytes); self.buffer.resize(aligned_len, 0); Ok(self.buffer.split().freeze()) } /// Serialize with validation #[instrument(skip(self, data))] pub fn serialize_validated<T>(&mut self, data: &T) -> ZeroCopyResult<Bytes> where T: Archive + for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, Failure>>, T::Archived: for<'a> bytecheck::CheckBytes<HighValidator<'a, Failure>>, { let bytes = self.serialize(data)?; // Validate the serialized data let _archived = access::<T::Archived, Failure>(&bytes) .map_err(|e| ZeroCopyError::Validation(format!("Validation failed: {:?}", e)))?; Ok(bytes) } } impl Default for ZeroCopySerializer { fn default() -> Self { Self::new() } } /// Zero-copy deserializer that provides direct access to archived data #[derive(Debug)] pub struct ZeroCopyDeserializer { // Buffer to hold shared references _buffers: Vec<Arc<Bytes>>, } impl ZeroCopyDeserializer { pub fn new() -> Self { Self { _buffers: Vec::new(), } } /// Access archived data directly without deserialization #[instrument(skip(data))] pub fn access<'a, T>(&self, data: &'a [u8]) -> ZeroCopyResult<&'a T::Archived> where T: Archive, T::Archived: for<'b> bytecheck::CheckBytes<HighValidator<'b, Failure>>, { access::<T::Archived, Failure>(data) .map_err(|e| ZeroCopyError::ArchiveAccess(format!("Access failed: {:?}", e))) } /// Access archived data without validation (unsafe but fast) #[instrument(skip(data))] pub fn access_unchecked<'a, T>(&self, data: &'a [u8]) -> &'a T::Archived where T: Archive, T::Archived: rkyv::Portable, { unsafe { access_unchecked::<T::Archived>(data) } } /// Deserialize to owned data when necessary #[instrument(skip(self, data))] pub fn deserialize<T>(&self, data: &[u8]) -> ZeroCopyResult<T> where T: Archive, T::Archived: for<'a> bytecheck::CheckBytes<HighValidator<'a, Failure>> + Deserialize<T, HighDeserializer<Failure>>, { from_bytes::<T, Failure>(data).map_err(ZeroCopyError::Serialization) } } impl Default for ZeroCopyDeserializer { fn default() -> Self { Self::new() } } /// Shared buffer pool for zero-copy operations #[derive(Debug)] pub struct BytesBufferPool { buffers: crossbeam_queue::SegQueue<BytesMut>, default_capacity: usize, } impl BytesBufferPool { pub fn new(default_capacity: usize) -> Self { Self { buffers: crossbeam_queue::SegQueue::new(), default_capacity, } } /// Get a buffer from the pool or create a new one pub fn get(&self) -> BytesMut { self.buffers .pop() .unwrap_or_else(|| BytesMut::with_capacity(self.default_capacity)) } /// Return a buffer to the pool pub fn put(&self, mut buffer: BytesMut) { buffer.clear(); if buffer.capacity() <= self.default_capacity * 2 { self.buffers.push(buffer); } // Drop oversized buffers to prevent memory bloat } } /// Zero-copy container that can hold either owned or borrowed data pub enum ZeroCopyData<T> { Owned(T), Archived(Bytes, std::marker::PhantomData<T>), } impl<T> ZeroCopyData<T> where T: Archive, { /// Create from owned data pub fn owned(data: T) -> Self { Self::Owned(data) } /// Create from archived bytes pub fn archived(bytes: Bytes) -> Self { Self::Archived(bytes, std::marker::PhantomData) } /// Access the data (either owned or archived) pub fn access(&self) -> ZeroCopyResult<ZeroCopyDataRef<'_, T>> where T::Archived: for<'a> bytecheck::CheckBytes<HighValidator<'a, Failure>>, { match self { Self::Owned(data) => Ok(ZeroCopyDataRef::Owned(data)), Self::Archived(bytes, _) => { let archived = access::<T::Archived, Failure>(bytes) .map_err(|e| ZeroCopyError::ArchiveAccess(format!("Access failed: {:?}", e)))?; Ok(ZeroCopyDataRef::Archived(archived)) } } } } /// Reference to either owned or archived data pub enum ZeroCopyDataRef<'a, T: Archive> { Owned(&'a T), Archived(&'a <T as Archive>::Archived), } /// Helper function to align size up to boundary fn align_up(size: usize, alignment: usize) -> usize { (size + alignment - 1) & !(alignment - 1) } /// Streaming serializer for large datasets #[derive(Debug)] pub struct StreamingSerializer { serializer: ZeroCopySerializer, chunk_size: usize, } impl StreamingSerializer { pub fn new(chunk_size: usize) -> Self { Self { serializer: ZeroCopySerializer::with_capacity(chunk_size), chunk_size, } } /// Serialize items in chunks pub fn serialize_chunks<T, I>(&mut self, items: I) -> ZeroCopyResult<Vec<Bytes>> where I: IntoIterator<Item = T>, T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, Failure>>, { let mut chunks = Vec::new(); let mut chunk = Vec::new(); let mut current_size = 0; for item in items { let item_bytes = to_bytes::<Failure>(&item).map_err(ZeroCopyError::Serialization)?; if current_size + item_bytes.len() > self.chunk_size && !chunk.is_empty() { // Serialize current chunk let chunk_bytes = self.serializer.serialize(&chunk)?; chunks.push(chunk_bytes); chunk.clear(); current_size = 0; } chunk.push(item); current_size += item_bytes.len(); } // Serialize remaining items if !chunk.is_empty() { let chunk_bytes = self.serializer.serialize(&chunk)?; chunks.push(chunk_bytes); } Ok(chunks) } } #[cfg(test)] mod tests { use super::*; use rkyv::{Archive, Deserialize, Serialize}; #[derive(Archive, Serialize, Deserialize, Debug, PartialEq, Clone)] struct TestData { id: u64, name: String, values: Vec<i32>, } #[test] fn test_zero_copy_serializer() { let mut serializer = ZeroCopySerializer::new(); let data = TestData { id: 42, name: "test".to_string(), values: vec![1, 2, 3, 4, 5], }; let bytes = serializer.serialize(&data).unwrap(); assert!(!bytes.is_empty()); // Verify we can deserialize let deserializer = ZeroCopyDeserializer::new(); let archived = deserializer.access::<TestData>(&bytes).unwrap(); assert_eq!(archived.id, 42); assert_eq!(archived.name, "test"); assert_eq!(archived.values.len(), 5); } #[test] fn test_buffer_pool() { let pool = BytesBufferPool::new(1024); let buf1 = pool.get(); assert_eq!(buf1.capacity(), 1024); pool.put(buf1); let buf2 = pool.get(); assert_eq!(buf2.capacity(), 1024); } #[test] fn test_zero_copy_data() { let data = TestData { id: 123, name: "example".to_string(), values: vec![10, 20, 30], }; // Test owned data let owned = ZeroCopyData::owned(data.clone()); let owned_ref = owned.access().unwrap(); match owned_ref { ZeroCopyDataRef::Owned(d) => assert_eq!(d.id, 123), _ => panic!("Expected owned data"), } // Test archived data let bytes = to_bytes::<Failure>(&data).unwrap(); let archived = ZeroCopyData::<TestData>::archived(Bytes::from(bytes.to_vec())); let archived_ref = archived.access().unwrap(); match archived_ref { ZeroCopyDataRef::Archived(d) => assert_eq!(d.id, 123), _ => panic!("Expected archived data"), } } #[test] fn test_streaming_serializer() { let mut streaming = StreamingSerializer::new(1024); // Use a simple type to ensure Serialize bounds are satisfied across rkyv versions let items: Vec<u64> = (0..10).collect(); let chunks = streaming.serialize_chunks(items).unwrap(); assert!(!chunks.is_empty()); // Verify we can access each chunk let deserializer = ZeroCopyDeserializer::new(); for chunk in chunks { let _archived = deserializer.access::<Vec<u64>>(&chunk).unwrap(); } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server