Skip to main content
Glama
concurrency.rs7.97 kB
// concurrency.rs - Demonstrates Rust concurrency patterns use std::sync::{Arc, Mutex, RwLock, mpsc}; use std::thread; use std::time::Duration; /// Simple thread spawning pub fn spawn_simple_thread() { let handle = thread::spawn(|| { for i in 1..5 { println!("Thread: {}", i); thread::sleep(Duration::from_millis(10)); } }); handle.join().unwrap(); } /// Thread with move closure pub fn thread_with_ownership() { let data = vec![1, 2, 3, 4, 5]; let handle = thread::spawn(move || { println!("Thread got data: {:?}", data); data.iter().sum::<i32>() }); let result = handle.join().unwrap(); println!("Sum: {}", result); } /// Multiple threads pub fn spawn_multiple_threads(count: usize) -> Vec<i32> { let mut handles = vec![]; for i in 0..count { let handle = thread::spawn(move || { thread::sleep(Duration::from_millis(10)); i as i32 * 2 }); handles.push(handle); } handles.into_iter().map(|h| h.join().unwrap()).collect() } // Mutex for shared state /// Shared counter with Mutex pub fn shared_counter(thread_count: usize) -> i32 { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..thread_count { let counter = Arc::clone(&counter); let handle = thread::spawn(move || { for _ in 0..100 { let mut num = counter.lock().unwrap(); *num += 1; } }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } let final_count = *counter.lock().unwrap(); final_count } /// Safe data structure with Mutex #[derive(Debug)] pub struct SafeCounter { count: Arc<Mutex<i32>>, } impl SafeCounter { pub fn new() -> Self { Self { count: Arc::new(Mutex::new(0)), } } pub fn increment(&self) { let mut count = self.count.lock().unwrap(); *count += 1; } pub fn get(&self) -> i32 { *self.count.lock().unwrap() } pub fn clone_counter(&self) -> SafeCounter { SafeCounter { count: Arc::clone(&self.count), } } } // RwLock for read-heavy workloads /// Shared data with RwLock pub struct SharedData { data: Arc<RwLock<Vec<i32>>>, } impl SharedData { pub fn new() -> Self { Self { data: Arc::new(RwLock::new(Vec::new())), } } pub fn add(&self, value: i32) { let mut data = self.data.write().unwrap(); data.push(value); } pub fn get(&self, index: usize) -> Option<i32> { let data = self.data.read().unwrap(); data.get(index).copied() } pub fn len(&self) -> usize { let data = self.data.read().unwrap(); data.len() } pub fn clone_data(&self) -> SharedData { SharedData { data: Arc::clone(&self.data), } } } // Message passing with channels /// Simple channel communication pub fn simple_channel() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let messages = vec!["hello", "from", "thread"]; for msg in messages { tx.send(msg).unwrap(); thread::sleep(Duration::from_millis(10)); } }); for received in rx { println!("Got: {}", received); } } /// Multiple producers pub fn multiple_producers() -> Vec<String> { let (tx, rx) = mpsc::channel(); let mut results = Vec::new(); for i in 0..3 { let tx_clone = tx.clone(); thread::spawn(move || { let message = format!("Message from thread {}", i); tx_clone.send(message).unwrap(); }); } drop(tx); // Drop original sender for received in rx { results.push(received); } results } /// Worker pool pattern pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv(); match job { Ok(job) => { println!("Worker {} executing job", id); job(); } Err(_) => { println!("Worker {} shutting down", id); break; } } }); Worker { id, thread: Some(thread), } } } impl ThreadPool { pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } } // Barrier synchronization use std::sync::Barrier; pub fn barrier_example() { let barrier = Arc::new(Barrier::new(3)); let mut handles = vec![]; for i in 0..3 { let barrier = Arc::clone(&barrier); let handle = thread::spawn(move || { println!("Thread {} before barrier", i); barrier.wait(); println!("Thread {} after barrier", i); }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } } // Atomic operations use std::sync::atomic::{AtomicUsize, Ordering}; pub struct AtomicCounter { count: AtomicUsize, } impl AtomicCounter { pub fn new() -> Self { Self { count: AtomicUsize::new(0), } } pub fn increment(&self) { self.count.fetch_add(1, Ordering::SeqCst); } pub fn get(&self) -> usize { self.count.load(Ordering::SeqCst) } } // Once initialization use std::sync::Once; static mut SINGLETON: Option<String> = None; static INIT: Once = Once::new(); pub fn get_singleton() -> &'static String { unsafe { INIT.call_once(|| { SINGLETON = Some(String::from("Singleton instance")); }); SINGLETON.as_ref().unwrap() } } // Scoped threads (Rust 1.63+) pub fn scoped_threads() { let mut data = vec![1, 2, 3, 4, 5]; thread::scope(|s| { s.spawn(|| { println!("Thread 1 can access data: {:?}", data); }); s.spawn(|| { println!("Thread 2 can also access data: {:?}", data); }); }); data.push(6); // Can use data after scope println!("Final data: {:?}", data); } #[cfg(test)] mod tests { use super::*; #[test] fn test_shared_counter() { let result = shared_counter(5); assert_eq!(result, 500); } #[test] fn test_safe_counter() { let counter = SafeCounter::new(); counter.increment(); counter.increment(); assert_eq!(counter.get(), 2); } #[test] fn test_shared_data() { let data = SharedData::new(); data.add(1); data.add(2); data.add(3); assert_eq!(data.len(), 3); assert_eq!(data.get(1), Some(2)); } #[test] fn test_atomic_counter() { let counter = AtomicCounter::new(); counter.increment(); counter.increment(); assert_eq!(counter.get(), 2); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Shashankss1205/CodeGraphContext'

If you have feedback or need assistance with the MCP directory API, please join our Discord server