From ceb46af47bc82171540587c56eb3cdea63373b0e Mon Sep 17 00:00:00 2001 From: Sean Lynch <42618346+swlynch99@users.noreply.github.com> Date: Fri, 22 Nov 2024 17:57:20 -0800 Subject: [PATCH] QCon SF Final Workshop Code This is the final workshop code that we live coded together at the QCon. It contains: - The completed intro program, with bonus --loud flag, - The implementation of the single-threaded SIEVE cache, and, - Our final concurrent SIEVE cache using dashmap and fine-grained locking. --- Cargo.lock | 27 +++++- benchmarking/benches/benchmarks/mod.rs | 8 ++ intro/src/main.rs | 30 ++++++ sieve_cache/Cargo.toml | 1 + sieve_cache/src/concurrent_sieve_cache.rs | 113 ++++++++++++++++++++++ sieve_cache/src/lib.rs | 2 + sieve_cache/src/sieve_cache.rs | 78 ++++++++++++--- sieve_cache/tests/tests.rs | 2 +- 8 files changed, 242 insertions(+), 19 deletions(-) create mode 100644 sieve_cache/src/concurrent_sieve_cache.rs diff --git a/Cargo.lock b/Cargo.lock index 0a22cec..d26767f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -358,6 +358,20 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "debugid" version = "0.8.0" @@ -464,6 +478,12 @@ dependencies = [ "crunchy", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.0" @@ -495,7 +515,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.0", ] [[package]] @@ -1049,6 +1069,7 @@ name = "sieve_cache" version = "0.1.0" dependencies = [ "cache", + "dashmap", ] [[package]] @@ -1294,7 +1315,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/benchmarking/benches/benchmarks/mod.rs b/benchmarking/benches/benchmarks/mod.rs index 58abbfd..7c73f63 100644 --- a/benchmarking/benches/benchmarks/mod.rs +++ b/benchmarking/benches/benchmarks/mod.rs @@ -4,6 +4,7 @@ use kcache::{KCache, SharableKCache}; use lru_cache::LruCache; use multi_thread_cache_test::benchmark_cache_multi_threaded; use pprof::criterion::{Output, PProfProfiler}; +use sieve_cache::ConcurrentSieveCache; use single_thread_cache_test::benchmark_cache_single_threaded; mod kcache; @@ -70,6 +71,13 @@ fn multi_threaded_comparison(c: &mut Criterion) { thread_count, LruCache::new(), ); + + benchmark_cache_multi_threaded( + BenchmarkId::new("concurrent_sieve", thread_count), + &mut multi_thread_benchmark_group, + thread_count, + ConcurrentSieveCache::new(), + ); } } diff --git a/intro/src/main.rs b/intro/src/main.rs index e3c5fb4..d3539e8 100644 --- a/intro/src/main.rs +++ b/intro/src/main.rs @@ -1,10 +1,40 @@ +use clap::Parser; + +#[derive(Debug, Parser)] +struct Args { + /// This is the input. + input: String, + + /// Shout out the input. + #[arg(long)] + loud: bool, +} + fn main() { // This is a convenient way to set up the normal Rust ecosystem logging. env_logger::init(); // Parse your arguments. + let args = Args::parse(); // Handle your argument - numbers versus strings. + let input = args.input; + let result = input.parse::(); + + match result { + Ok(value) => { + println!("{}", value * 2); + } + Err(_) => { + let input = if args.loud { + input.to_uppercase() + } else { + input + }; + + println!("You said \"{}\"", input); + } + } // Print the result. } diff --git a/sieve_cache/Cargo.toml b/sieve_cache/Cargo.toml index 5b23520..c18f04e 100644 --- a/sieve_cache/Cargo.toml +++ b/sieve_cache/Cargo.toml @@ -8,3 +8,4 @@ bench = false [dependencies] cache = { workspace = true } +dashmap = "6.1.0" diff --git a/sieve_cache/src/concurrent_sieve_cache.rs b/sieve_cache/src/concurrent_sieve_cache.rs new file mode 100644 index 0000000..6f94dc9 --- /dev/null +++ b/sieve_cache/src/concurrent_sieve_cache.rs @@ -0,0 +1,113 @@ +use std::hash::Hash; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Mutex; + +use cache::ShareableCache; +use dashmap::DashMap; + +struct Entry { + data: Mutex>, + visited: AtomicBool, +} + +pub struct ConcurrentSieveCache { + data: Vec>, + map: dashmap::DashMap, + hand: Mutex, +} + +impl ConcurrentSieveCache +where + K: Send + Sync + Eq + Hash + Clone, + V: Send + Sync + Clone, +{ + pub fn new() -> Self { + let mut data = Vec::with_capacity(cache::MAX_SIZE); + for _ in 0..cache::MAX_SIZE { + data.push(Entry { + data: Mutex::new(None), + visited: AtomicBool::new(false), + }); + } + + Self { + data, + map: DashMap::new(), + hand: Mutex::new(0), + } + } +} + +impl ConcurrentSieveCache +where + K: Send + Sync + Eq + Hash + Clone, + V: Send + Sync + Clone, +{ + fn evict(&self) -> usize { + let mut hand = self.hand.lock().unwrap(); + + loop { + let current = *hand; + *hand += 1; + if *hand >= self.data.len() { + *hand = 0; + } + + let entry = &self.data[current]; + let visited = entry.visited.swap(false, Ordering::Relaxed); + + if visited { + continue; + } + + return current; + } + } +} + +impl ShareableCache for ConcurrentSieveCache +where + K: Send + Sync + Eq + Hash + Clone, + V: Send + Sync + Clone, +{ + fn get(&self, key: &K) -> Option { + let index = *self.map.get(key)?; + let entry = &self.data[index]; + + entry.visited.store(true, Ordering::Relaxed); + let lock = entry.data.lock().expect("mutex was poisoned"); + let (_, value) = lock.as_ref()?; + + Some(value.clone()) + } + + fn set(&self, key: K, value: V) { + let index = self.evict(); + let entry = &self.data[index]; + + let mut data = entry.data.lock().unwrap(); + + if let Some((key, _)) = data.take() { + self.map.remove(&key); + } + + let prev = self.map.insert(key.clone(), index); + *data = Some((key, value)); + + drop(data); + + if let Some(prev) = prev { + *self.data[prev].data.lock().unwrap() = None; + } + } +} + +impl Default for ConcurrentSieveCache +where + K: Send + Sync + Eq + Hash + Clone, + V: Send + Sync + Clone, +{ + fn default() -> Self { + Self::new() + } +} diff --git a/sieve_cache/src/lib.rs b/sieve_cache/src/lib.rs index c212803..265b4ae 100644 --- a/sieve_cache/src/lib.rs +++ b/sieve_cache/src/lib.rs @@ -1,3 +1,5 @@ +mod concurrent_sieve_cache; mod sieve_cache; +pub use concurrent_sieve_cache::ConcurrentSieveCache; pub use sieve_cache::SieveCache; diff --git a/sieve_cache/src/sieve_cache.rs b/sieve_cache/src/sieve_cache.rs index a0c5937..00a7fc9 100644 --- a/sieve_cache/src/sieve_cache.rs +++ b/sieve_cache/src/sieve_cache.rs @@ -1,16 +1,59 @@ +use std::collections::HashMap; +use std::hash::Hash; + use cache::SizeLimitedCache; +struct Entry { + data: Option<(Key, Value)>, + visited: bool, +} + pub struct SieveCache { - /// This is a placeholder to allow the code to compile in a work-in-progress state. - /// You'll remove this field when you choose a data structure to hold the raw cache - /// values. - _phantom: std::marker::PhantomData<(Key, Value)>, + map: HashMap, + data: Vec>, + hand: usize, } impl SieveCache { pub fn new() -> Self { + let mut data = Vec::with_capacity(cache::MAX_SIZE); + for _ in 0..cache::MAX_SIZE { + data.push(Entry { + data: None, + visited: false, + }); + } + Self { - _phantom: std::marker::PhantomData, + data, + map: HashMap::default(), + hand: 0, + } + } +} + +impl SieveCache +where + Key: Eq + Hash, +{ + fn evict(&mut self) -> usize { + loop { + let current = self.hand; + self.hand += 1; + if self.hand >= self.data.len() { + self.hand = 0; + } + + let entry = &mut self.data[current]; + if std::mem::replace(&mut entry.visited, false) { + continue; + } + + if let Some((key, _)) = entry.data.take() { + self.map.remove(&key); + } + + return current; } } } @@ -22,21 +65,26 @@ where Value: Clone, { fn get(&mut self, key: &Key) -> Option { - // These silence unused variable warnings. Delete them before you - // implement this method. - let _ = key; + let index = *self.map.get(key)?; + let entry = &mut self.data[index]; - // todo!() - None + entry.visited = true; + let (_, value) = entry.data.as_ref()?; + + Some(value.clone()) } fn set(&mut self, key: Key, value: Value) { - // These silence unused variable warnings. Delete them before you - // implement this method. - let _ = key; - let _ = value; + if let Some(index) = self.map.get(&key) { + let entry = &mut self.data[*index]; + entry.data = Some((key, value)); + return; + } - // todo!() + let index = self.evict(); + let entry = &mut self.data[index]; + entry.data = Some((key.clone(), value)); + self.map.insert(key, index); } } diff --git a/sieve_cache/tests/tests.rs b/sieve_cache/tests/tests.rs index a0aef99..b67862b 100644 --- a/sieve_cache/tests/tests.rs +++ b/sieve_cache/tests/tests.rs @@ -107,7 +107,7 @@ fn evict_skips_read_values() { #[test] fn evict_only_evicts_necessary_entries() { let mut cache = SieveCache::new(); - + // This should fill up the cache to the maximum size. for i in 0..cache::MAX_SIZE { cache.set(i, i);