Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
spsc.rs5.6 kB
use core::cell::UnsafeCell; use core::mem::MaybeUninit; use core::sync::atomic::{AtomicUsize, Ordering}; use crossbeam_utils::CachePadded; use thiserror::Error; #[derive(Debug, Error)] pub enum SpscError { #[error("queue is full")] Full, #[error("queue is empty")] Empty, } /// Wait-free SPSC bounded ring buffer queue. /// /// - One dedicated producer, one dedicated consumer /// - Uses Acquire/Release ordering to ensure correctness /// - Capacity must be a power of two for fast modulo pub struct WaitFreeSpscQueue<T> { mask: usize, head: CachePadded<AtomicUsize>, tail: CachePadded<AtomicUsize>, buf: Box<[UnsafeCell<MaybeUninit<T>>]>, } // Safety: Producer and consumer operate on disjoint indices; T must be Send unsafe impl<T: Send> Send for WaitFreeSpscQueue<T> {} unsafe impl<T: Send> Sync for WaitFreeSpscQueue<T> {} impl<T> WaitFreeSpscQueue<T> { /// Create a new queue with the given capacity. Capacity is rounded up to the next power of two. pub fn with_capacity(cap: usize) -> (Producer<T>, Consumer<T>) { assert!(cap > 1, "capacity must be > 1"); let capacity = cap.next_power_of_two(); let mask = capacity - 1; let mut vec = Vec::with_capacity(capacity); for _ in 0..capacity { vec.push(UnsafeCell::new(MaybeUninit::uninit())); } let q = WaitFreeSpscQueue { mask, head: CachePadded::new(AtomicUsize::new(0)), tail: CachePadded::new(AtomicUsize::new(0)), buf: vec.into_boxed_slice(), }; let shared = Box::new(q); // Use raw pointer across Producer/Consumer; both wrap Arc-like ownership via Box leak let ptr = Box::into_raw(shared); (Producer { inner: ptr }, Consumer { inner: ptr }) } } impl<T> WaitFreeSpscQueue<T> { #[inline] fn capacity(&self) -> usize { self.mask + 1 } #[inline] fn slot(&self, idx: usize) -> &UnsafeCell<MaybeUninit<T>> { &self.buf[idx & self.mask] } } /// Producer side of the SPSC queue. pub struct Producer<T> { pub(crate) inner: *mut WaitFreeSpscQueue<T>, } /// Consumer side of the SPSC queue. pub struct Consumer<T> { pub(crate) inner: *mut WaitFreeSpscQueue<T>, } impl<T> Producer<T> { #[inline] pub fn try_push(&self, value: T) -> Result<(), SpscError> { let q = unsafe { &*self.inner }; let tail = q.tail.load(Ordering::Relaxed); let head = q.head.load(Ordering::Acquire); if tail.wrapping_sub(head) == q.capacity() - 1 { // full: keep one slot open return Err(SpscError::Full); } let slot = q.slot(tail); unsafe { (*slot.get()).write(value) }; q.tail.store(tail.wrapping_add(1), Ordering::Release); Ok(()) } } impl<T> Consumer<T> { #[inline] pub fn try_pop(&self) -> Result<T, SpscError> { let q = unsafe { &*self.inner }; let head = q.head.load(Ordering::Relaxed); let tail = q.tail.load(Ordering::Acquire); if head == tail { return Err(SpscError::Empty); } let slot = q.slot(head); let val = unsafe { (*slot.get()).assume_init_read() }; q.head.store(head.wrapping_add(1), Ordering::Release); Ok(val) } } impl<T> Drop for Producer<T> { fn drop(&mut self) { // SAFETY: Only free when both producer and consumer have been dropped. // Here we leak and let the consumer drop free the memory. } } impl<T> Drop for Consumer<T> { fn drop(&mut self) { unsafe { // Drain remaining elements to drop T properly let q = &*self.inner; while q.head.load(Ordering::Relaxed) != q.tail.load(Ordering::Relaxed) { let _ = self.try_pop(); } drop(Box::from_raw(self.inner)); } } } #[cfg(test)] mod tests { use super::*; use std::thread; use std::time::Duration; #[test] fn spsc_basic() { let (p, c) = WaitFreeSpscQueue::with_capacity(8); p.try_push(1).unwrap(); p.try_push(2).unwrap(); assert_eq!(c.try_pop().unwrap(), 1); assert_eq!(c.try_pop().unwrap(), 2); assert!(matches!(c.try_pop(), Err(SpscError::Empty))); } #[test] fn spsc_concurrent() { let (p, c) = WaitFreeSpscQueue::with_capacity(1024); let t = thread::spawn(move || { for i in 0..10_000u32 { loop { if p.try_push(i).is_ok() { break; } } } }); let mut count = 0u32; while count < 10_000 { if let Ok(v) = c.try_pop() { assert_eq!(v, count); count += 1; } else { thread::yield_now(); } } t.join().unwrap(); } #[cfg(feature = "loom")] mod loom_tests { use super::*; use loom::sync::atomic::Ordering; use loom::thread; // A small loom test to explore ordering; not exhaustive #[test] fn loom_spsc() { loom::model(|| { let (p, c) = WaitFreeSpscQueue::with_capacity(2); let tp = thread::spawn(move || { p.try_push(1).ok(); }); let tc = thread::spawn(move || { let _ = c.try_pop(); }); tp.join().unwrap(); tc.join().unwrap(); }); } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server