diff --git a/Cargo.toml b/Cargo.toml index 3c21d40..e299aca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ serde = { version = "1.0.131", features = ["derive"] } serde_json = { version = "1.0", default-features = true, features = ["alloc"] } anyhow = "1.0.31" thiserror = "1.0" -tokio = { version = "1.16.1", features = ["full"] } +tokio = { version = "1.16.1", features = ["full", "tracing"] } byteorder = "1.4.3" rand = "0.8.5" maligned = "0.2.1" @@ -47,6 +47,7 @@ eieio = "1.0.0" either = "1.8.1" enum-unitary = "0.5.0" atom_box = "0.1.2" +console-subscriber = "0.1.8" [dev-dependencies] tracing-subscriber = "0.3.16" tracing-log = "0.1.3" @@ -54,3 +55,5 @@ chrono = "0.4.22" env_logger = "0.9.1" console_log = { version = "0.2.0", features = ["color"] } +[build] +rustflags = ["--cfg", "tokio_unstable"] \ No newline at end of file diff --git a/src/db.rs b/src/db.rs deleted file mode 100644 index 00f7a9c..0000000 --- a/src/db.rs +++ /dev/null @@ -1,18 +0,0 @@ -use crate::kv::KV; -use crate::options::Options; -use crate::types::{XArc, XWeak}; - -pub struct DataBase { - kv: XArc, - VL: Option, -} - -impl DataBase { - async fn new() { - let kv = KV::open(Options::default()).await; - } -} - -pub struct VL { - kv: XWeak, -} diff --git a/src/iterator.rs b/src/iterator.rs index 07e731f..188aedd 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -1,13 +1,13 @@ use crate::iterator::PreFetchStatus::Prefetched; use crate::kv::_BADGER_PREFIX; -use crate::types::{ArcMx, ArcRW, Closer, TArcMx, TArcRW}; -use crate::MergeIterOverIterator; +use crate::types::{ArcMx, ArcRW, Channel, Closer, TArcMx, TArcRW}; use crate::{ kv::KV, types::XArc, value_log::{MetaBit, ValuePointer}, Decode, Result, Xiterator, }; +use crate::{MergeIterOverIterator, ValueStruct}; use log::Metadata; use parking_lot::RwLock; use std::future::Future; @@ -15,6 +15,8 @@ use std::pin::Pin; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; use std::{io::Cursor, sync::atomic::AtomicU64}; +use tokio::io::AsyncWriteExt; +use tokio::time::Sleep; #[derive(Debug, PartialEq)] pub(crate) enum PreFetchStatus { @@ -22,7 +24,7 @@ pub(crate) enum PreFetchStatus { Prefetched, } -type KVItem = TArcRW; +pub(crate) type KVItem = TArcRW; // Returned during iteration. Both the key() and value() output is only valid until // iterator.next() is called. @@ -41,11 +43,39 @@ pub(crate) struct KVItemInner { } impl KVItemInner { + pub(crate) fn new(key: Vec, value: ValueStruct, kv: XArc) -> KVItemInner { + Self { + status: Arc::new(tokio::sync::RwLock::new(PreFetchStatus::Empty)), + kv, + key, + value: Arc::new(Default::default()), + vptr: value.value, + meta: value.meta, + user_meta: value.user_meta, + cas_counter: Arc::new(AtomicU64::new(value.cas_counter)), + wg: Closer::new("kv".to_owned()), + err: Ok(()), + } + } + // Returns the key. Remember to copy if you need to access it outside the iteration loop. pub(crate) fn key(&self) -> &[u8] { &self.key } + pub async fn get_value(&self) -> Result> { + let ch = Channel::new(1); + self.value(|value| { + let tx = ch.tx(); + Box::pin(async move { + tx.send(value).await.unwrap(); + Ok(()) + }) + }) + .await?; + Ok(ch.recv().await.unwrap()) + } + // Value retrieves the value of the item from the value log. It calls the // consumer function with a slice argument representing the value. In case // of error, the consumer function is not called. diff --git a/src/kv.rs b/src/kv.rs index 55275b0..24c23d3 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -1,13 +1,13 @@ -use crate::iterator::{IteratorExt, IteratorOptions, KVItemInner}; +use crate::iterator::{IteratorExt, IteratorOptions, KVItem, KVItemInner}; use crate::levels::{LevelsController, XLevelsController}; use crate::manifest::{open_or_create_manifest_file, Manifest, ManifestFile}; use crate::options::Options; use crate::table::builder::Builder; use crate::table::iterator::{IteratorImpl, IteratorItem}; use crate::table::table::{new_file_name, Table, TableCore}; -use crate::types::{ArcMx, Channel, Closer, TArcMx, XArc, XWeak}; +use crate::types::{ArcMx, Channel, Closer, TArcMx, TArcRW, XArc, XWeak}; use crate::value_log::{ - ArcRequest, Entry, MetaBit, Request, ValueLogCore, ValuePointer, MAX_KEY_SIZE, + ArcRequest, Entry, EntryType, MetaBit, Request, ValueLogCore, ValuePointer, MAX_KEY_SIZE, }; use crate::y::{ async_sync_directory, create_synced_file, sync_directory, Encode, Result, ValueStruct, @@ -17,13 +17,16 @@ use crate::{ Decode, Error, MergeIterOverBuilder, MergeIterOverIterator, Node, SkipList, SkipListManager, UniIterator, Xiterator, }; +use anyhow::__private::kind::TraitKind; +use async_channel::RecvError; use atomic::Atomic; -use bytes::BufMut; -use crossbeam_epoch::Shared; +use bytes::{Buf, BufMut}; +use crossbeam_epoch::{Owned, Shared}; use drop_cell::defer; use fs2::FileExt; use log::{info, Log}; -use parking_lot::Mutex; +use parking_lot::lock_api::MutexGuard; +use parking_lot::{Mutex, RawMutex}; use std::cell::RefCell; use std::fs::File; use std::fs::{try_exists, OpenOptions}; @@ -39,18 +42,19 @@ use std::time::Duration; use std::{string, vec}; use tokio::fs::create_dir_all; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::sync::{RwLock, RwLockWriteGuard}; +use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; pub const _BADGER_PREFIX: &[u8; 8] = b"!badger!"; // Prefix for internal keys used by badger. pub const _HEAD: &[u8; 11] = b"!bager!head"; // For Storing value offset for replay. -struct Closers { - update_size: Closer, - compactors: Closer, - mem_table: Closer, // Wait flush job exit - writes: Closer, - value_gc: Closer, +pub struct Closers { + pub update_size: Closer, + pub compactors: Closer, + pub mem_table: Closer, + // Wait flush job exit + pub writes: Closer, + pub value_gc: Closer, } struct FlushTask { @@ -64,6 +68,11 @@ impl FlushTask { } } +pub struct KVBuilder { + opt: Options, + kv: BoxKV, +} + pub struct KV { pub opt: Options, pub vlog: Option, @@ -74,14 +83,14 @@ pub struct KV { // write_chan: Channel, dir_lock_guard: File, value_dir_guard: File, - closers: Closers, + pub closers: Closers, // Our latest (actively written) in-memory table. - mem_st_manger: Arc, + pub mem_st_manger: Arc, // Add here only AFTER pushing to flush_ch - write_ch: Channel, + pub write_ch: Channel, // Incremented in the non-concurrently accessed write loop. But also accessed outside. So // we use an atomic op. - last_used_cas_counter: AtomicU64, + pub(crate) last_used_cas_counter: AtomicU64, share_lock: tokio::sync::RwLock<()>, // TODO user ctx replace closer ctx: tokio_context::context::Context, @@ -89,8 +98,15 @@ pub struct KV { } unsafe impl Send for KV {} + unsafe impl Sync for KV {} +impl Drop for KV { + fn drop(&mut self) { + info!("Drop kv"); + } +} + pub struct BoxKV { pub kv: *const KV, } @@ -149,7 +165,7 @@ impl KV { value_dir_guard, closers, write_ch: Channel::new(1), - last_used_cas_counter: Default::default(), + last_used_cas_counter: AtomicU64::new(1), mem_st_manger: Arc::new(SkipListManager::new(opt.arena_size() as usize)), share_lock: tokio::sync::RwLock::new(()), ctx, @@ -178,6 +194,7 @@ impl KV { _out.spawn_update_size().await; }); } + // mem_table closer { let _out = xout.clone(); @@ -189,7 +206,7 @@ impl KV { } // Get the lasted ValueLog Recover Pointer - let item = match xout.get(_HEAD) { + let item = match xout._get(_HEAD) { Err(NotFound) => ValueStruct::default(), // Give it a default value Err(_) => return Err("Retrieving head".into()), Ok(item) => item, @@ -207,15 +224,16 @@ impl KV { if !item.value.is_empty() { vptr.dec(&mut Cursor::new(value))?; } - let replay_closer = Closer::new("tmp_replay".to_owned()); + let replay_closer = Closer::new("tmp_writer_closer".to_owned()); { let _out = xout.clone(); let replay_closer = replay_closer.spawn(); tokio::spawn(async move { - _out.do_writes(replay_closer).await; + _out.do_writes(replay_closer, true).await; }); } + // replay data from vlog let mut first = true; xout.vlog .as_ref() @@ -234,7 +252,7 @@ impl KV { // TODO why? if entry.cas_counter_check != 0 { - let old_value = xout.get(&entry.key)?; + let old_value = xout._get(&entry.key)?; if old_value.cas_counter != entry.cas_counter_check { return Ok(true); } @@ -264,6 +282,7 @@ impl KV { .await?; // Wait for replay to be applied first. replay_closer.signal_and_wait().await; + // Mmap writeable log let max_fid = xout.must_vlog().max_fid.load(Ordering::Relaxed); let lf = xout.must_vlog().pick_log_by_vlog_id(&max_fid); @@ -274,7 +293,7 @@ impl KV { let closer = xout.closers.writes.spawn(); let _out = xout.clone(); tokio::spawn(async move { - _out.do_writes(closer).await; + _out.do_writes(closer, false).await; }); } @@ -285,7 +304,6 @@ impl KV { _out.must_vlog().wait_on_gc(closer).await; }); } - Ok(xout) } } @@ -294,7 +312,7 @@ impl KV { async fn walk_dir(dir: &str) -> Result<(u64, u64)> { let mut lsm_size = 0; let mut vlog_size = 0; - let mut entries = tokio::fs::read_dir("dir").await?; + let mut entries = tokio::fs::read_dir(dir).await?; while let Some(entry) = entries.next_entry().await? { let meta = entry.metadata().await?; if meta.is_dir() { @@ -311,10 +329,10 @@ impl KV { // get returns the value in `mem_table` or disk for given key. // Note that value will include meta byte. - pub(crate) fn get(&self, key: &[u8]) -> Result { + pub(crate) fn _get(&self, key: &[u8]) -> Result { let p = crossbeam_epoch::pin(); let tables = self.get_mem_tables(&p); - + // info!("tabels {}", tables.len()); // TODO add metrics for tb in tables { let vs = unsafe { tb.as_ref().unwrap().get(key) }; @@ -323,7 +341,7 @@ impl KV { } let vs = vs.unwrap(); // TODO why - if vs.meta != 0 && !vs.value.is_empty() { + if vs.meta != 0 || !vs.value.is_empty() { return Ok(vs); } } @@ -331,6 +349,22 @@ impl KV { self.must_lc().get(key).ok_or(NotFound) } + // Sets the provided value for a given key. If key is not present, it is created. If it is + // present, the existing value is overwritten with the one provided. + // Along with key and value, Set can also take an optional userMeta byte. This byte is stored + // alongside the key, and can be used as an aid to interpret the value or store other contextual + // bits corresponding to the key-value pair. + pub(crate) async fn set(&self, key: Vec, value: Vec, user_meta: u8) -> Result<()> { + let res = self + .batch_set(vec![Entry::default() + .key(key) + .value(value) + .user_meta(user_meta)]) + .await; + assert_eq!(res.len(), 1); + res[0].to_owned() + } + // Returns the current `mem_tables` and get references. fn get_mem_tables<'a>(&'a self, p: &'a crossbeam_epoch::Guard) -> Vec> { self.mem_st_manger.lock_exclusive(); @@ -356,7 +390,11 @@ impl KV { if reqs.is_empty() { return Ok(()); } - info!("write_requests called. Writing to value log"); + info!( + "write_requests called. Writing to value log, count: {}", + reqs.len() + ); + // CAS counter for all operations has to go onto value log. Otherwise, if it is just in // memtable for a long time, and following CAS operations use that as a check, when // replaying, we will think that these CAS operations should fail, when they are actually @@ -365,19 +403,42 @@ impl KV { // There is code (in flush_mem_table) whose correctness depends on us generating CAS Counter // values _before_ we modify s.vptr here. for req in reqs.iter() { - let entries = req.req_ref().entries.write(); + let entries = &req.req_ref().entries; let counter_base = self.new_cas_counter(entries.len() as u64); for (idx, entry) in entries.iter().enumerate() { - entry.borrow_mut().cas_counter = counter_base + idx as u64; + let mut entry = entry.write().await; + entry.mut_entry().cas_counter = counter_base + idx as u64; + info!("update cas counter: {}", entry.entry().cas_counter); + } + } + + // TODO add error set + if let Err(err) = self.vlog.as_ref().unwrap().write(reqs.clone()).await { + for req in reqs.iter() { + req.set_err(Err(err.clone())).await; } + return Err(err); } - self.vlog.as_ref().unwrap().write(reqs.clone())?; info!("Writing to memory table"); let mut count = 0; for req in reqs.iter() { - count += req.get_req().entries.read().len(); + if req.get_req().entries.is_empty() { + continue; + } + count += req.get_req().entries.len(); + while let Err(err) = self.ensure_room_for_write().await { + tokio::time::sleep(Duration::from_millis(10)).await; + } + info!("waiting for write"); + if let Err(err) = self.write_to_lsm(req.clone()).await { + req.set_err(Err(err)).await; + } else { + req.set_err(Ok(())).await; + } + self.update_offset(req.get_req().ptrs.read().await); } + info!("{} entries written", count); Ok(()) } @@ -440,54 +501,196 @@ impl KV { // for e in entries { // Check(e.Error); // } - // TODO - pub(crate) async fn batch_set(&self, entries: Vec) -> Result> { - let mut bad = vec![]; - let mut b = vec![Request::default()]; + pub(crate) async fn batch_set(&self, entries: Vec) -> Vec> { let mut count = 0; let mut sz = 0u64; - for entry in entries { + let mut res = vec![Ok(()); entries.len()]; + let mut req = Request::default(); + let mut req_index = vec![]; + + for (i, entry) in entries.into_iter().enumerate() { if entry.key.len() > MAX_KEY_SIZE { - bad.push(entry); + res[i] = Err("Key too big".into()); continue; } if entry.value.len() as u64 > self.opt.value_log_file_size { - bad.push(entry); + res[i] = Err("Value to big".into()); continue; } - count += 1; - sz += self.opt.estimate_size(&entry) as u64; - let req = b.last_mut().unwrap(); - req.entries.write().push(RefCell::new(entry)); + { + count += 1; + sz += self.opt.estimate_size(&entry) as u64; + req.entries + .push(tokio::sync::RwLock::new(EntryType::from(entry))); + req.ptrs.write().await.push(None); + req_index.push(i); + } + if count >= self.opt.max_batch_count || sz >= self.opt.max_batch_count { - b.push(Request::default()); + assert!(!self.write_ch.is_close()); + info!( + "send tasks to write, entries: {}", + req.entries.len() + ); + let arc_req = ArcRequest::from(req); + self.write_ch.send(arc_req.clone()).await.unwrap(); + { + count = 0; + sz = 0; + for (index, err) in arc_req.req_ref().get_errs().await.into_iter().enumerate() { + let entry_index = req_index[index]; + res[entry_index] = err; + } + req = Request::default(); + req_index.clear(); + } } } - let mut reqs = vec![]; - for req in b { - if req.entries.read().is_empty() { - break; - } + if !req.entries.is_empty() { let arc_req = ArcRequest::from(req); - reqs.push(arc_req.clone()); - self.write_ch.send(arc_req).await.unwrap(); + self.write_ch.send(arc_req.clone()).await.unwrap(); + { + count = 0; + sz = 0; + for (index, err) in arc_req.get_req().get_errs().await.into_iter().enumerate() { + let entry_index = req_index[index]; + res[entry_index] = err; + } + req = Request::default(); + req_index.clear(); + } } - if !bad.is_empty() { - let req = Request::default(); - *req.entries.write() = - Vec::from_iter(bad.into_iter().map(|bad| RefCell::new(bad)).into_iter()); - let arc_req = ArcRequest::from(req); - arc_req - .set_err(Err("key too big or value to big".into())) - .await; - reqs.push(arc_req); + res + } + + async fn write_to_lsm(&self, req: ArcRequest) -> Result<()> { + defer! {info!("exit write to lsm")} + let req = req.get_req(); //.entries.read(); + let ptrs = req.ptrs.read().await; + let entries = &req.entries; + assert_eq!(entries.len(), ptrs.len()); + + for (i, pair) in entries.iter().enumerate() { + let entry_pair = pair.read().await; + let entry = entry_pair.entry(); + if entry.cas_counter_check != 0 { + let old_value = self._get(&entry.key)?; + // No need to decode existing value. Just need old CAS counter. + if old_value.cas_counter != entry.cas_counter_check { + entry_pair.set_resp(Err(Error::ValueCasMisMatch)).await; + continue; + } + } + + if entry.meta == MetaBit::BIT_SET_IF_ABSENT.bits() { + // Someone else might have written a value, so lets check again if key exists. + let exits = self._exists(&entry.key)?; + // Value already exists. don't write. + if exits { + entry_pair.set_resp(Err(Error::ValueKeyExists)).await; + continue; + } + } + + if self.should_write_value_to_lsm(entry) { + // Will include deletion/tombstone case. + self.must_mt().put( + &entry.key, + ValueStruct::new( + entry.value.clone(), // TODO avoid value clone + entry.meta, + entry.user_meta, + entry.cas_counter, + ), + ); + } else { + let ptr = ptrs.get(i).unwrap().as_ref().unwrap(); + let mut wt = Cursor::new(vec![0u8; ValuePointer::value_pointer_encoded_size()]); + ptr.enc(&mut wt).unwrap(); + self.must_mt().put( + &entry.key, + ValueStruct::new( + wt.into_inner(), + entry.meta | MetaBit::BIT_VALUE_POINTER.bits(), + entry.user_meta, + entry.cas_counter, + ), + ); + } + } + + Ok(()) + } + + fn _exists(&self, key: &[u8]) -> Result { + let value = self._get(key)?; + if value.value.is_empty() && value.meta == 0 { + return Ok(false); + } + if value.meta & MetaBit::BIT_DELETE.bits() != 0 { + return Ok(false); } - Ok(reqs) + + Ok(true) } fn new_cas_counter(&self, how_many: u64) -> u64 { self.last_used_cas_counter .fetch_add(how_many, Ordering::Relaxed) + + 1 + } + + async fn ensure_room_for_write(&self) -> Result<()> { + defer! {info!("exit ensure room for write!")} + // TODO a special global lock for this function + let _ = self.share_lock.write().await; + if self.must_mt().mem_size() < self.opt.max_table_size as u32 { + return Ok(()); + } + info!("flush memory table"); + // A nil mt indicates that KV is being closed. + assert!(!self.must_mt().empty()); + let flush_task = FlushTask { + mt: Some(self.must_mt().clone()), + vptr: self.must_vptr(), + }; + if let Ok(_) = self.flush_chan.try_send(flush_task) { + info!("Flushing value log to disk if async mode."); + // Ensure value log is synced to disk so this memtable's contents wouldn't be lost. + self.must_vlog().sync()?; + info!( + "Flushing memtable, mt.size={} size of flushChan: {}", + self.must_mt().mem_size(), + self.flush_chan.tx().len() + ); + // We manage to push this task. Let's modify imm. + self.mem_st_manger.swap_st(self.opt.clone()); + // New memtable is empty. We certainly have room. + Ok(()) + } else { + Err(Unexpected("No room for write".into())) + } + } + + fn should_write_value_to_lsm(&self, entry: &Entry) -> bool { + entry.value.len() < self.opt.value_threshold + } + + fn update_offset(&self, ptrs: RwLockReadGuard>>) { + let mut ptr = &ValuePointer::default(); + for tmp_ptr in ptrs.iter().rev() { + if tmp_ptr.is_none() || tmp_ptr.as_ref().unwrap().is_zero() { + continue; + } + ptr = tmp_ptr.as_ref().unwrap(); + break; + } + + if ptr.is_zero() { + return; + } + + self.vptr.store(Owned::new(ptr.clone()), Ordering::Release); } } @@ -497,9 +700,10 @@ impl KV { lc } - fn must_mt(&self) -> &SkipList { + pub(crate) fn must_mt(&self) -> &SkipList { let p = crossbeam_epoch::pin(); let st = self.mem_st_manger.mt_ref(&p).as_raw(); + assert!(!st.is_null()); unsafe { &*st } } @@ -556,6 +760,117 @@ impl ArcKV { self.manifest.write().await } + /// Return a value that will async load value, if want not return value, should be `exists` + pub async fn get(&self, key: &[u8]) -> Result> { + let got = self._get(key)?; + let inner = KVItemInner::new(key.to_vec(), got, self.clone()); + inner.get_value().await + } + + pub(crate) async fn get_with_ext(&self, key: &[u8]) -> Result { + let got = self._get(key)?; + let inner = KVItemInner::new(key.to_vec(), got, self.clone()); + Ok(TArcRW::new(tokio::sync::RwLock::new(inner))) + } + + /// Set sets the provided value for a given key. If key is not present, it is created. If it is + /// present, the existing value is overwritten with the one provided. + /// Along with key and value, Set can also take an optional userMeta byte. This byte is stored + /// alongside the key, and can be used as an aid to interpret the value or store other contextual + /// bits corresponding to the key-value pair. + pub async fn set(&self, key: Vec, value: Vec, user_meta: u8) -> Result<()> { + self.to_ref().set(key, value, user_meta).await + } + + /// Sets value of key if key is not present. + /// If it is present, it returns the key_exists error. + /// TODO it should be atomic operate + pub async fn set_if_ab_sent(&self, key: Vec, value: Vec, user_meta: u8) -> Result<()> { + let exists = self.exists(&key).await?; + // found the key, return key_exists + if exists { + return Err(Error::ValueKeyExists); + } + let entry = Entry::default() + .key(key) + .value(value) + .user_meta(user_meta) + .meta(MetaBit::BIT_SET_IF_ABSENT.bits()); + let ret = self.batch_set(vec![entry]).await; + ret[0].to_owned() + } + + /// Return Ok(true) if key exists, Ok(false) if key not exists, Otherwise Err(err) if happen some error. + pub async fn exists(&self, key: &[u8]) -> Result { + return self._exists(key); + } + + /// Batch set entries, returns result sets + pub async fn batch_set(&self, entries: Vec) -> Vec> { + self.to_ref().batch_set(entries).await + } + + /// CompareAndSetAsync is the asynchronous version of CompareAndSet. It accepts a callback function + /// which is called when the CompareAndSet completes. Any error encountered during execution is + /// passed as an argument to the callback function. + pub async fn compare_and_set( + &self, + key: Vec, + value: Vec, + cas_counter: u64, + ) -> Result<()> { + let entry = Entry::default() + .key(key) + .value(value) + .cas_counter_check(cas_counter); + let ret = self.batch_set(vec![entry]).await; + ret[0].to_owned() + } + + /// Delete deletes a key. + /// Exposing this so that user does not have to specify the Entry directly. + /// For example, BitDelete seems internal to badger. + pub async fn delete(&self, key: Vec) -> Result<()> { + let entry = Entry::default().key(key); + let ret = self.batch_set(vec![entry]).await; + ret[0].to_owned() + } + + /// CompareAndDelete deletes a key ensuring that it has not been changed since last read. + /// If existing key has different casCounter, this would not delete the key and return an error. + pub async fn compare_and_delete(&self, key: Vec, cas_counter: u64) -> Result<()> { + let entry = Entry::default().key(key).cas_counter_check(cas_counter); + let ret = self.batch_set(vec![entry]).await; + ret[0].to_owned() + } + + /// RunValueLogGC would trigger a value log garbage collection with no guarantees that a call would + /// result in a space reclaim. Every run would in the best case rewrite only one log file. So, + /// repeated calls may be necessary. + /// + /// The way it currently works is that it would randomly pick up a value log file, and sample it. If + /// the sample shows that we can discard at least discardRatio space of that file, it would be + /// rewritten. Else, an ErrNoRewrite error would be returned indicating that the GC didn't result in + /// any file rewrite. + /// + /// We recommend setting discardRatio to 0.5, thus indicating that a file be rewritten if half the + /// space can be discarded. This results in a lifetime value log write amplification of 2 (1 from + /// original write + 0.5 rewrite + 0.25 + 0.125 + ... = 2). Setting it to higher value would result + /// in fewer space reclaims, while setting it to a lower value would result in more space reclaims at + /// the cost of increased activity on the LSM tree. discardRatio must be in the range (0.0, 1.0), + /// both endpoints excluded, otherwise an ErrInvalidRequest is returned. + /// + /// Only one GC is allowed at a time. If another value log GC is running, or KV has been closed, this + /// would return an ErrRejected. + /// + /// Note: Every time GC is run, it would produce a spike of activity on the LSM tree. + pub async fn run_value_log_gc(&self, discard_ratio: f64) -> Result<()> { + if discard_ratio >= 1.0 || discard_ratio <= 0.0 { + return Err(Error::ValueInvalidRequest); + } + self.must_vlog().trigger_gc(discard_ratio).await + } + /// Closes a KV. It's crucial to call it to ensure all the pending updates /// make their way to disk. pub async fn close(&self) -> Result<()> { @@ -666,97 +981,73 @@ impl ArcKV { } impl ArcKV { - async fn do_writes(&self, lc: Closer) { + async fn do_writes(&self, lc: Closer, without_close_write_ch: bool) { info!("start do writes task!"); - defer! {lc.done();}; + defer! {info!("exit writes task!")} + defer! {lc.done()} // TODO add metrics let has_been_close = lc.has_been_closed(); let write_ch = self.write_ch.clone(); let reqs = ArcMx::>::new(Mutex::new(vec![])); + let to_reqs = || { + let to_reqs = reqs + .lock() + .clone() + .into_iter() + .map(|req| req.clone()) + .collect::>(); + reqs.lock().clear(); + Arc::new(to_reqs) + }; loop { tokio::select! { - _ = has_been_close.recv() => { + ret = has_been_close.recv() => { break; }, req = write_ch.recv() => { + if req.is_err() { + assert!(write_ch.is_close()); + info!("receive a invalid write task, err: {:?}", req.unwrap_err()); + break; + } reqs.lock().push(req.unwrap()); } } - // TODO avoid memory allocate again - if reqs.lock().len() == 100 { - let to_reqs = reqs - .lock() - .clone() - .into_iter() - .map(|req| req.clone()) - .collect::>(); - let to_reqs = Arc::new(to_reqs); - if let Err(err) = self.write_requests(to_reqs).await { - // for req in reqs.lock().iter() { - // req.set_err(Err(err.clone())).await; - // } + + let to_reqs = if reqs.lock().len() == 100 { + to_reqs() + } else { + if let Ok(req) = write_ch.try_recv() { + reqs.lock().push(req); + Arc::new(vec![]) + } else { + to_reqs() } - reqs.lock().clear(); + }; + + if !to_reqs.is_empty() { + self.write_requests(to_reqs.clone()) + .await + .expect("TODO: panic message"); } } // clear future requests - write_ch.close(); + if !without_close_write_ch { + assert!(!write_ch.is_close()); + write_ch.close(); + } loop { let req = write_ch.try_recv(); - if req.is_err() { + if let Err(err) = &req { + assert!(err.is_closed() || err.is_empty(), "{:?}", err); break; } - let req = req.unwrap(); - reqs.lock().push(req); - let to_reqs = reqs - .lock() - .clone() - .into_iter() - .map(|req| req.clone()) - .collect::>(); - if let Err(err) = self.write_requests(Arc::new(to_reqs)).await { - // for req in reqs.lock().iter() { - // req.set_err(Err(err.clone())).await; - // } - } - } - } - - fn should_write_value_to_lsm(&self, entry: &Entry) -> bool { - entry.value.len() < self.opt.value_threshold - } - - // Always called serially. - async fn ensure_room_for_write(&self) -> Result<()> { - // TODO a special global lock for this function - let _ = self.share_lock.write().await; - if self.must_mt().mem_size() < self.opt.max_table_size as u32 { - return Ok(()); - } - - // A nil mt indicates that KV is being closed. - assert!(!self.must_mt().empty()); - - let flush_task = FlushTask { - mt: Some(self.must_mt().clone()), - vptr: self.must_vptr(), - }; - if let Ok(_) = self.flush_chan.try_send(flush_task) { - info!("Flushing value log to disk if async mode."); - // Ensure value log is synced to disk so this memtable's contents wouldn't be lost. - self.must_vlog().sync()?; - info!( - "Flushing memtable, mt.size={} size of flushChan: {}", - self.must_mt().mem_size(), - self.flush_chan.tx().len() - ); - // We manage to push this task. Let's modify imm. - self.mem_st_manger.swap_st(self.opt.clone()); - // New memtable is empty. We certainly have room. - Ok(()) - } else { - Err(Unexpected("No room for write".into())) + reqs.lock().push(req.unwrap()); + let to_reqs = to_reqs(); + self.write_requests(to_reqs.clone()) + .await + .expect("TODO: panic message"); } } @@ -799,22 +1090,3 @@ async fn write_level0_table(st: &SkipList, f: &mut tokio::fs::File) -> Result<() fn arena_size(opt: &Options) -> usize { (opt.max_table_size + opt.max_batch_size + opt.max_batch_count * Node::size() as u64) as usize } - -#[test] -fn t_pointer() { - struct Ext { - v: Vec, - name: String, - } - - let t = Ext { - v: vec![], - name: "Name".to_owned(), - }; - - let p = unsafe { &t as *const Ext }; - - let arc_p = Arc::new(t); - - print!("==> {:?}", p); -} diff --git a/src/kv_test.rs b/src/kv_test.rs new file mode 100644 index 0000000..a656842 --- /dev/null +++ b/src/kv_test.rs @@ -0,0 +1,145 @@ +use log::info; +use log::kv::ToValue; +use std::env::temp_dir; +use std::io::Write; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use crate::iterator::IteratorOptions; +use crate::types::XArc; +use crate::value_log::Entry; +use crate::{kv::KV, options::Options}; + +fn get_test_option(dir: &str) -> Options { + let mut opt = Options::default(); + opt.max_table_size = 1 << 15; //Force more compaction. + opt.level_one_size = 4 << 15; // Force more compaction. + opt.dir = Box::new(dir.clone().to_string()); + opt.value_dir = Box::new(dir.to_string()); + opt +} + +#[tokio::test] +async fn t_1_write() { + use crate::test_util::{mock_log, mock_log_terminal, random_tmp_dir, tracing_log}; + tracing_log(); + // console_subscriber::init(); + let dir = random_tmp_dir(); + let kv = KV::open(get_test_option(&dir)).await; + let kv = kv.unwrap(); + let res = kv.set(b"hello".to_vec(), b"word".to_vec(), 10).await; + // assert!(res.is_ok()); + // let got = kv._get(b"hello"); + // assert!(got.is_ok()); + // assert_eq!(&got.unwrap().value, b"word"); +} + +#[tokio::test] +async fn t_batch_write() { + use crate::test_util::{mock_log, mock_log_terminal, random_tmp_dir, tracing_log}; + tracing_log(); + let dir = random_tmp_dir(); + let kv = KV::open(get_test_option(&dir)).await; + let kv = kv.unwrap(); + let n = 100; + for i in 0..n { + let res = kv + .set(format!("{}", i).as_bytes().to_vec(), b"word".to_vec(), 10) + .await; + assert!(res.is_ok()); + } + + for i in 0..n { + let got = kv._get(format!("{}", i).as_bytes()); + assert!(got.is_ok()); + assert_eq!(&got.unwrap().value, b"word"); + } +} + +#[tokio::test] +async fn t_concurrent_write() { + use crate::test_util::{mock_log, mock_log_terminal, random_tmp_dir, tracing_log}; + tracing_log(); + let dir = random_tmp_dir(); + let kv = KV::open(get_test_option(&dir)).await; + let kv = kv.unwrap(); + let mut wg = awaitgroup::WaitGroup::new(); + let n = 200; + for i in 0..n { + let kv = kv.clone(); + let wk = wg.worker(); + tokio::spawn(async move { + let res = kv + .set(format!("{}", i).as_bytes().to_vec(), b"word".to_vec(), 10) + .await; + assert!(res.is_ok()); + wk.done(); + }); + } + + wg.wait().await; + info!("Starting iteration"); + let itr = kv + .new_iterator(IteratorOptions { + reverse: false, + pre_fetch_values: true, + pre_fetch_size: 10, + }) + .await; + let mut i = 0; + while let Some(item) = itr.next().await { + let item = item.read().await; + assert_eq!(item.key(), format!("{}", i).as_bytes()); + assert_eq!(item.get_value().await.unwrap(), b"word".to_vec()); + i += 1; + } +} + +#[tokio::test] +async fn t_cas() { + let n = 100; + let kv = build_kv().await; + let entries = (0..n) + .into_iter() + .map(|i| { + Entry::default() + .key(format!("{}", i).into_bytes()) + .value(format!("{}", i).into_bytes()) + }) + .collect::>(); + for got in kv.batch_set(entries.clone()).await { + assert!(got.is_ok()); + } + tokio::time::sleep(Duration::from_secs(1)).await; + let mut items = vec![]; + for i in 0..n { + let key = format!("{}", i).as_bytes().to_vec(); + let value = format!("{}", i).as_bytes().to_vec(); + let got = kv.get_with_ext(&key).await.unwrap(); + let got_value = got.read().await.get_value().await.unwrap(); + assert_eq!(got_value, value); + items.push(got); + } + + for i in 0..n { + let key = format!("{}", i).into_bytes(); + let value = format!("{}", i).into_bytes(); + let mut cc = items[i].read().await.counter(); + println!("counter: {}", cc); + if cc == 5 { + cc = 6; + } else { + cc = 5; + } + assert!(kv.compare_and_set(key, value, cc).await.is_err()); + } +} + +async fn build_kv() -> XArc { + use crate::test_util::{mock_log, mock_log_terminal, random_tmp_dir, tracing_log}; + tracing_log(); + let dir = random_tmp_dir(); + let kv = KV::open(get_test_option(&dir)).await; + let kv = kv.unwrap(); + kv +} diff --git a/src/levels.rs b/src/levels.rs index 3a971d0..0743031 100644 --- a/src/levels.rs +++ b/src/levels.rs @@ -830,3 +830,8 @@ async fn revert_to_manifest( } Ok(()) } + +#[test] +fn it() { + +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index fd33485..d7e554d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,9 +33,11 @@ mod value_log; #[cfg(test)] mod value_log_tests; mod y; +mod level_handler; mod compaction; -mod level_handler; +// #[cfg(test)] +// mod kv_test; mod levels; mod mmap; mod pb; @@ -45,7 +47,6 @@ mod test_util; #[cfg(test)] mod kv_test; -mod db; pub use skl::*; pub use st_manager::*; diff --git a/src/log_file.rs b/src/log_file.rs index 359b553..8ae83eb 100644 --- a/src/log_file.rs +++ b/src/log_file.rs @@ -99,7 +99,7 @@ impl LogFile { Ok((v, cursor_offset)) } - // async iterate from offset that must be call with thread safty + // async iterate from offset that must be call with thread safety pub(crate) async fn iterate_by_offset( &self, mut offset: u32, diff --git a/src/options/mod.rs b/src/options/mod.rs index a9e658d..79d970a 100644 --- a/src/options/mod.rs +++ b/src/options/mod.rs @@ -107,6 +107,3 @@ impl Default for Options { } } } - -#[test] -fn it() {} diff --git a/src/skl/alloc.rs b/src/skl/alloc.rs index 5378502..0777510 100644 --- a/src/skl/alloc.rs +++ b/src/skl/alloc.rs @@ -5,8 +5,13 @@ use std::fmt::{Debug, Display, Formatter}; use std::marker::PhantomData; use std::mem::{align_of, size_of, ManuallyDrop}; +use atom_box::AtomBox; +use either::Either; +use libc::off_t; +use log::info; +use std::alloc::alloc; use std::ptr::{slice_from_raw_parts, slice_from_raw_parts_mut, NonNull}; -use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicPtr, AtomicU32, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use std::thread::{sleep, spawn}; use std::time::Duration; @@ -14,75 +19,51 @@ use std::{ptr, thread}; pub trait Allocate: Send + Sync { type Block; + #[inline] fn alloc(&self, start: usize, n: usize) -> Self::Block; + #[inline] fn size(&self) -> usize; + #[inline] + fn used_count(&self) -> usize; } pub trait Chunk: Send + Sync { + #[inline] fn get_data(&self) -> &[u8]; + #[inline] fn get_data_mut(&self) -> &mut [u8]; + #[inline] fn size(&self) -> usize; } #[derive(Debug)] #[repr(C)] -pub struct SmartAllocate { - pub(crate) ptr: std::mem::ManuallyDrop>, -} - -impl Allocate for SmartAllocate { - type Block = impl Chunk; - - fn alloc(&self, start: usize, n: usize) -> Self::Block { - let ptr = self.get_data_ptr(); - let block_ptr = unsafe { ptr.add(start) as *mut u8 }; - let block = BlockBytes::new(NonNull::new(block_ptr).unwrap(), n); - block - } - - fn size(&self) -> usize { - self.ptr.len() - } -} - -impl SmartAllocate { - pub(crate) fn new(m: std::mem::ManuallyDrop>) -> Self { - println!("new a alloc memory, len: {}", m.len()); - SmartAllocate { ptr: m } - } - - #[inline] - pub(crate) fn get_data_ptr(&self) -> *const u8 { - self.ptr.as_ptr() - } -} - -#[derive(Clone, Debug)] -#[repr(C)] pub struct BlockBytes { - start: NonNull, + start: AtomicPtr, n: usize, } -unsafe impl Send for BlockBytes {} - -unsafe impl Sync for BlockBytes {} - impl BlockBytes { - pub(crate) fn new(start: NonNull, n: usize) -> Self { - BlockBytes { start, n } + pub(crate) fn new(start: *mut u8, n: usize) -> Self { + BlockBytes { + start: AtomicPtr::new(start), + n, + } } } impl Chunk for BlockBytes { + #[inline] fn get_data(&self) -> &[u8] { - unsafe { &*slice_from_raw_parts(self.start.as_ptr(), self.n) } + unsafe { &*slice_from_raw_parts(self.start.load(Ordering::Relaxed), self.n) } } + #[inline] fn get_data_mut(&self) -> &mut [u8] { - unsafe { &mut *slice_from_raw_parts_mut(self.start.as_ptr(), self.n) } + unsafe { &mut *slice_from_raw_parts_mut(self.start.load(Ordering::Relaxed), self.n) } } + #[inline] fn size(&self) -> usize { self.n } @@ -92,7 +73,7 @@ impl Chunk for BlockBytes { #[derive(Debug, Clone)] pub struct OnlyLayoutAllocate { cursor: Arc, - len: Arc, + cap: Arc, pub(crate) ptr: ManuallyDrop>, _data: PhantomData, } @@ -102,14 +83,18 @@ unsafe impl Send for OnlyLayoutAllocate {} unsafe impl Sync for OnlyLayoutAllocate {} impl OnlyLayoutAllocate { - fn size() -> usize { + pub(crate) fn size() -> usize { size_of::() } + pub(crate) fn len(&self) -> usize { + self.cursor.load(Ordering::Relaxed) + } + pub fn new(n: usize) -> Self { OnlyLayoutAllocate { cursor: Arc::from(AtomicUsize::new(Self::size())), - len: Arc::from(AtomicUsize::new(n)), + cap: Arc::from(AtomicUsize::new(n)), ptr: ManuallyDrop::new(vec![0u8; n]), _data: Default::default(), } @@ -119,7 +104,7 @@ impl OnlyLayoutAllocate { /// **Note** if more than len, it will be panic. pub fn alloc(&self, start: usize) -> &T { let end = self.cursor.fetch_add(Self::size(), Ordering::Acquire); - assert!(end < self.len.load(Ordering::Relaxed)); + assert!(end < self.cap.load(Ordering::Relaxed)); let ptr = self.borrow_slice(start, Self::size()); let (pre, mid, suf) = unsafe { ptr.align_to() }; assert!(pre.is_empty()); @@ -130,7 +115,7 @@ impl OnlyLayoutAllocate { /// **Note** if more than len, it will be panic. pub fn mut_alloc(&self, start: usize) -> &mut T { let end = self.cursor.fetch_add(Self::size(), Ordering::Relaxed); - assert!(end < self.len.load(Ordering::Relaxed)); + assert!(end < self.cap.load(Ordering::Relaxed)); let ptr = self.borrow_mut_slice(start, Self::size()); let (pre, mid, _) = unsafe { ptr.align_to_mut() }; assert!(pre.is_empty()); @@ -139,7 +124,7 @@ impl OnlyLayoutAllocate { pub fn alloc_offset(&self) -> (&T, usize) { let offset = self.cursor.fetch_add(Self::size(), Ordering::Relaxed); - assert!(offset + Self::size() < self.len.load(Ordering::Relaxed)); + assert!(offset + Self::size() < self.cap.load(Ordering::Relaxed)); let ptr = self.borrow_slice(offset, Self::size()); let (pre, mid, _) = unsafe { ptr.align_to() }; assert!(pre.is_empty()); @@ -163,9 +148,8 @@ impl OnlyLayoutAllocate { } pub(crate) fn reset(&self) { - self.len.store(0, Ordering::Relaxed); + self.cap.store(0, Ordering::Relaxed); self.cursor.store(0, Ordering::Relaxed); - //self.ptr.clear(); } #[inline] @@ -194,10 +178,8 @@ impl OnlyLayoutAllocate { impl Drop for OnlyLayoutAllocate { fn drop(&mut self) { self.cursor.store(0, Ordering::Relaxed); - self.cursor.store(0, Ordering::Relaxed); - // TODO: free memory unsafe { - // ManuallyDrop::into_inner(self.ptr); + ManuallyDrop::drop(&mut self.ptr); } } } @@ -206,23 +188,21 @@ impl Drop for OnlyLayoutAllocate { #[derive(Debug)] pub struct SliceAllocate { cursor: Arc, - len: Arc, + cap: Arc, pub(crate) ptr: ManuallyDrop>, } unsafe impl Send for SliceAllocate {} -unsafe impl Sync for SliceAllocate {} - impl SliceAllocate { - fn size(&self) -> usize { - self.len.load(Ordering::Relaxed) + pub fn len(&self) -> usize { + self.cursor.load(Ordering::Relaxed) } pub(crate) fn new(n: usize) -> Self { SliceAllocate { cursor: Arc::from(AtomicUsize::new(1)), - len: Arc::from(AtomicUsize::new(n)), + cap: Arc::from(AtomicUsize::new(n)), ptr: ManuallyDrop::new(vec![0u8; n]), } } @@ -231,26 +211,22 @@ impl SliceAllocate { self.borrow_slice(start, size) } - // fn get_mut(&mut self, start: usize, size: usize) -> &mut [u8] { - // self.borrow_mut_slice(start, size) - // } - // Return the start locate offset pub(crate) fn alloc(&self, size: usize) -> &[u8] { let offset = self.cursor.fetch_add(size, Ordering::Relaxed); - assert!(self.cursor.load(Ordering::Relaxed) < self.len.load(Ordering::Relaxed)); + assert!(self.cursor.load(Ordering::Relaxed) < self.cap.load(Ordering::Relaxed)); self.borrow_slice(offset, size) } fn alloc_mut(&self, size: usize) -> &mut [u8] { let offset = self.cursor.fetch_add(size, Ordering::Relaxed); - assert!(self.cursor.load(Ordering::Relaxed) < self.len.load(Ordering::Relaxed)); + assert!(self.cursor.load(Ordering::Relaxed) < self.cap.load(Ordering::Relaxed)); self.borrow_mut_slice(offset, size) } pub fn append(&self, bytes: &[u8]) -> usize { let offset = self.cursor.fetch_add(bytes.len(), Ordering::Relaxed); - assert!(self.cursor.load(Ordering::Relaxed) < self.len.load(Ordering::Relaxed)); + assert!(self.cursor.load(Ordering::Relaxed) < self.cap.load(Ordering::Relaxed)); let buffer = self.borrow_mut_slice(offset, bytes.len()); buffer.copy_from_slice(bytes); offset @@ -263,7 +239,7 @@ impl SliceAllocate { } pub fn reset(&self) { - self.len.swap(0, Ordering::Relaxed); + self.cap.swap(0, Ordering::Relaxed); self.cursor.store(0, Ordering::Relaxed); //self.ptr.clear(); } @@ -291,18 +267,6 @@ impl SliceAllocate { } } -// impl Allocate for SliceAllocate { -// type Block = (); -// -// fn alloc(&self, start: usize, n: usize) -> Self::Block { -// todo!() -// } -// -// fn size(&self) -> usize { -// todo!() -// } -// } - #[test] fn t_onlylayoutalloc() { let mut alloc: OnlyLayoutAllocate = OnlyLayoutAllocate::new(1 << 10); @@ -352,7 +316,7 @@ fn t_onlylayoutalloc_slice() { #[test] fn t_block_bytes() { let mut buffer = vec![0u8; 1024]; - let block = BlockBytes::new(NonNull::new(buffer.as_mut_ptr()).unwrap(), 10); + let block = BlockBytes::new(buffer.as_mut_ptr(), 10); { let data = block.get_data_mut(); for datum in 0..data.len() { @@ -363,6 +327,3 @@ fn t_block_bytes() { assert_eq!(buffer[datum], datum as u8); } } - -#[test] -fn t_clone() {} diff --git a/src/skl/arena.rs b/src/skl/arena.rs index 363f76c..2fdfb67 100644 --- a/src/skl/arena.rs +++ b/src/skl/arena.rs @@ -1,8 +1,8 @@ // use crate::skl::{Node, OwnedNode, MAX_HEIGHT, MAX_NODE_SIZE}; +use crate::skl::alloc::Chunk; use crate::skl::alloc::{OnlyLayoutAllocate, SliceAllocate}; use crate::skl::node::Node; use crate::skl::Allocate; -use crate::skl::{alloc::Chunk, SmartAllocate}; use crate::y::ValueStruct; use std::default; use std::fmt::format; @@ -27,10 +27,6 @@ pub struct Arena { node_alloc: OnlyLayoutAllocate, } -unsafe impl Send for Arena {} - -unsafe impl Sync for Arena {} - impl Arena { pub(crate) fn new(n: usize) -> Self { assert!(n > 0); @@ -45,12 +41,7 @@ impl Arena { } pub(crate) fn size(&self) -> u32 { - todo!() - } - - pub(crate) fn cap(&self) -> usize { - // self.slice.size() - todo!() + (self.slice.len() + self.node_alloc.len()) as u32 } // TODO: maybe use MaybeUint instead diff --git a/src/skl/mod.rs b/src/skl/mod.rs index bc9303e..7cda973 100644 --- a/src/skl/mod.rs +++ b/src/skl/mod.rs @@ -4,7 +4,7 @@ mod cursor; mod node; mod skip; -pub use alloc::{Allocate, BlockBytes, Chunk, SmartAllocate}; +pub use alloc::{Allocate, BlockBytes, Chunk}; pub use arena::Arena; pub use cursor::Cursor; pub use node::Node; diff --git a/src/skl/skip.rs b/src/skl/skip.rs index 9b376dc..9df4c13 100644 --- a/src/skl/skip.rs +++ b/src/skl/skip.rs @@ -2,7 +2,7 @@ use crate::skl::{Cursor, HEIGHT_INCREASE, MAX_HEIGHT}; use crate::table::iterator::IteratorItem; use crate::Xiterator; use atom_box::AtomBox; -use log::info; +use log::{debug, info}; use rand::random; use serde_json::Value; use std::borrow::Cow; @@ -13,6 +13,7 @@ use std::ptr::null_mut; use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Arc; use std::{cmp, ptr, ptr::NonNull, sync::atomic::AtomicI32}; +use tracing::field::debug; use crate::y::ValueStruct; @@ -67,12 +68,10 @@ impl SkipList { } pub(crate) fn arena_ref(&self) -> &Arena { - // unsafe {self.arena.as_ref()} &self.arena } pub(crate) fn arena_mut_ref(&self) -> &Arena { - // unsafe {self.arena.as_mut()} &self.arena } @@ -108,7 +107,6 @@ impl SkipList { ) -> (Option<&Node>, bool) { let mut x = self.get_head(); let mut level = self.get_height() - 1; - //println!("start to hight: {}", level); loop { // Assume x.key < key let mut next = self.get_next(x, level); @@ -348,11 +346,11 @@ impl SkipList { // gets the value associated with the key. // FIXME: maybe return Option<&ValueStruct> pub(crate) fn get(&self, key: &[u8]) -> Option { + // info!("find a key: {:?}", key); let (node, found) = self.find_near(key, false, true); if !found { return None; } - // println!("find a key: {:?}", key); let (value_offset, value_size) = node.unwrap().get_value_offset(); Some(self.arena_ref().get_val(value_offset, value_size)) } @@ -395,7 +393,6 @@ impl Drop for SkipList { fn drop(&mut self) { let _ref = self._ref.load(Ordering::Relaxed); info!("Drop SkipList, reference: {}", _ref); - self.arena_mut_ref().reset(); } } @@ -602,63 +599,6 @@ impl SkipIterator { } } -// impl Xiterator for SkipIterator { -// type Output = IteratorItem; -// -// fn next(&self) -> Option { -// todo!() -// } -// -// fn rewind(&self) -> Option { -// todo!() -// } -// -// fn seek(&self, key: &[u8]) -> Option { -// if self.node.load(Ordering::Relaxed).is_null() { -// return None; -// } -// -// let node = self.node.load(Ordering::Relaxed); -// if node.is_null() { -// return None; -// } -// let key = node.key(self.st.arena_ref()).to_vec(); -// let value = node.value.load(Ordering::Relaxed); -// Some(IteratorItem{ key: node.key(self.st.arena_ref()).to_vec(), value: Default::default() }) -// } -// -// fn peek(&self) -> Option { -// todo!() -// } -// -// fn close(&self) { -// todo!() -// } -// } - -// impl<'a> Xiterator for SkipIterator<'a> { -// type Output = &'a Node; -// fn next(&self) -> Option { -// todo!() -// } -// -// fn rewind(&self) -> Option { -// todo!() -// } -// -// fn seek(&self, key: &[u8]) -> Option { -// todo!() -// } -// -// fn peek(&self) -> Option { -// todo!() -// } -// -// fn close(&self) { -// self.st.decr_ref(); -// } -// } - mod tests { use crate::skl::node::Node; use crate::skl::skip::SkipList; @@ -1089,30 +1029,15 @@ mod tests { } mod tests2 { - use crate::SkipList; + use crate::{SkipList, ValueStruct}; const ARENA_SIZE: usize = 1 << 20; #[test] fn atomic_swap_skip_list() { let mut st = SkipList::new(ARENA_SIZE); - } - - #[test] - fn gat() { - // #![allow(unused)] - - // trait IterableTypes { - // type Item<'me>; - // type Iterator<'me>: Iterator>; - // } - - // trait Iterable: IterableTypes { - // fn iter<'a>(&'a self) -> Self::Iterator<'a>; - // } - - // struct GatSimple {} - - // impl GatSimple {} + st.put(b"hello", ValueStruct::new(vec![], 0, 0, 0)); + let got = st.get(b"hello"); + assert!(got.is_some()); } } diff --git a/src/types.rs b/src/types.rs index ead46f6..b8fd48f 100644 --- a/src/types.rs +++ b/src/types.rs @@ -75,6 +75,10 @@ impl Channel { self.tx.as_ref().unwrap().clone() } + pub fn rx(&self) -> Receiver { + self.rx.as_ref().unwrap().clone() + } + /// consume tx and return it if exist pub fn take_tx(&mut self) -> Option> { self.tx.take() @@ -82,10 +86,18 @@ impl Channel { /// close *Channel*, Sender will be consumed pub fn close(&self) { + info!("close channel"); if let Some(tx) = &self.tx { tx.close(); } } + + pub fn is_close(&self) -> bool { + if let Some(tx) = &self.tx { + return tx.is_closed(); + } + true + } } #[derive(Clone)] @@ -162,6 +174,11 @@ impl Closer { /// Spawn a worker pub fn spawn(&self) -> Self { + info!( + "spawn a new closer: Worker-{}-{}", + self.name, + self.wait.load(Ordering::Relaxed) + ); self.add_running(1); self.clone() } diff --git a/src/value_log.rs b/src/value_log.rs index 0aa3f5d..6eead10 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -1,9 +1,11 @@ -use async_channel::RecvError; +use async_channel::{Receiver, RecvError, Sender}; use awaitgroup::{WaitGroup, Worker}; use bitflags::bitflags; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use crc32fast::Hasher; use drop_cell::defer; +use either::Either; +use libc::{endgrent, memchr}; use log::info; use log::kv::Source; use memmap::{Mmap, MmapMut}; @@ -12,7 +14,7 @@ use rand::random; use serde_json::to_vec; use std::cell::{Ref, RefCell, RefMut}; use std::collections::{HashMap, HashSet}; -use std::fmt::Formatter; +use std::fmt::{Display, Formatter}; use std::fs::{read_dir, remove_file, File, OpenOptions}; use std::future::Future; use std::io::{BufWriter, Cursor, Read, Seek, SeekFrom, Write}; @@ -35,7 +37,7 @@ use crate::log_file::LogFile; use crate::options::Options; use crate::skl::BlockBytes; use crate::table::iterator::BlockSlice; -use crate::types::{ArcRW, Channel, Closer, TArcMx, XArc}; +use crate::types::{ArcMx, ArcRW, Channel, Closer, TArcMx, TArcRW, XArc}; use crate::y::{ create_synced_file, is_eof, open_existing_synced_file, read_at, sync_directory, Decode, Encode, }; @@ -104,7 +106,7 @@ impl Decode for Header { /// Entry provides Key, Value and if required, cas_counter_check to kv.batch_set() API. /// If cas_counter_check is provided, it would be compared against the current `cas_counter` /// assigned to this key-value. Set be done on this key only if the counters match. -#[derive(Default)] +#[derive(Default, Clone, Debug)] pub struct Entry { pub(crate) key: Vec, pub(crate) meta: u8, @@ -117,6 +119,33 @@ pub struct Entry { pub(crate) cas_counter: u64, } +impl Entry { + pub fn key(mut self, key: Vec) -> Self { + self.key = key; + self + } + + pub fn value(mut self, value: Vec) -> Self { + self.value = value; + self + } + + pub fn meta(mut self, meta: u8) -> Self { + self.meta = meta; + self + } + + pub fn user_meta(mut self, user_meta: u8) -> Self { + self.user_meta = user_meta; + self + } + + pub fn cas_counter_check(mut self, cas: u64) -> Self { + self.cas_counter_check = cas; + self + } +} + impl Entry { pub(crate) fn from_slice(cursor_offset: u32, m: &[u8]) -> Result { let mut entry = Entry::default(); @@ -261,38 +290,106 @@ impl Decode for ValuePointer { } } -pub(crate) struct Request { +// pub(crate) struct EntryType(Either>>); + +pub(crate) struct EntryType { + entry: Entry, + fut_ch: Channel>, +} + +impl EntryType { + pub(crate) fn entry(&self) -> &Entry { + &self.entry + } + + pub(crate) fn mut_entry(&mut self) -> &mut Entry { + &mut self.entry + } + + pub(crate) fn ret(&self) -> Receiver> { + self.fut_ch.rx() + } + + pub(crate) async fn set_resp(&self, ret: Result<()>) { + self.fut_ch.tx().send(ret).await.unwrap(); + } +} + +// impl Deref for EntryType { +// type Target = Either>; +// +// fn deref(&self) -> &Self::Target { +// &self.0 +// } +// } + +impl From for EntryType { + fn from(value: Entry) -> Self { + Self { + entry: value, + fut_ch: Channel::new(1), + } + } +} + +pub struct Request { // Input values, NOTE: RefCell is called concurrency - pub(crate) entries: RwLock>>, + pub(crate) entries: Vec>, // Output Values and wait group stuff below - pub(crate) ptrs: Mutex>>, - pub(crate) res: Channel>, + pub(crate) ptrs: tokio::sync::RwLock>>, } +// unsafe impl Send for Request {} +// +// unsafe impl Sync for Request {} + impl Default for Request { fn default() -> Self { Request { entries: Default::default(), ptrs: Default::default(), - res: Channel::new(1), } } } impl Request { - pub(crate) async fn get_resp(&self) -> Result<()> { - self.res.recv().await.unwrap() + pub(crate) async fn set_entries_resp(&self, ret: Result<()>) { + for entry in self.entries.iter() { + info!("set resp"); + entry.read().await.set_resp(ret.clone()).await; + } } - pub(crate) async fn set_resp(&self, ret: Result<()>) { - self.res.send(ret).await.unwrap() + pub async fn get_first_err(&self) -> Result<()> { + if let Some(ret) = self.entries.get(0) { + ret.read().await.ret().recv().await.unwrap() + } else { + Ok(()) + } + } + + pub async fn get_errs(&self) -> Vec> { + let mut res = vec![]; + for entry in self.entries.iter() { + let ch = entry.read().await.fut_ch.rx(); + let ret = ch.recv().await.unwrap(); + res.push(ret); + } + res } } +/// TODO add a field to indicate the request is every import, must be handle it immediately +/// Eg: compare_and_set #[derive(Clone)] -pub(crate) struct ArcRequest { - inner: Arc, - err: Arc>>, +pub struct ArcRequest { + pub(crate) inner: Arc, +} + +impl std::fmt::Debug for ArcRequest { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("ArcRequest").finish() + } } unsafe impl Send for ArcRequest {} @@ -300,21 +397,29 @@ unsafe impl Send for ArcRequest {} unsafe impl Sync for ArcRequest {} impl ArcRequest { - pub(crate) fn get_req(&self) -> Arc { - self.inner.clone() + pub fn req_ref(&self) -> &Arc { + &self.inner } - pub(crate) fn req_ref(&self) -> &Arc { - &self.inner + pub fn to_inner(self) -> Request { + Arc::into_inner(self.inner).unwrap() } - pub(crate) async fn set_err(&self, err: Result<()>) { - *self.err.lock() = err.clone(); - self.inner.res.send(err).await; + pub async fn is_ok(&self) -> bool { + let resp = self.get_req().get_first_err().await; + resp.is_ok() } - pub(crate) fn to_inner(self) -> Request { - Arc::into_inner(self.inner).unwrap() + pub async fn get_resp(&self) -> Result<()> { + self.get_req().get_first_err().await + } + + pub async fn set_err(&self, err: Result<()>) { + self.inner.set_entries_resp(err).await; + } + + pub(crate) fn get_req(&self) -> Arc { + self.inner.clone() } } @@ -322,7 +427,6 @@ impl From for ArcRequest { fn from(value: Request) -> Self { ArcRequest { inner: Arc::new(value), - err: Arc::new(Mutex::new(Ok(()))), } } } @@ -333,13 +437,14 @@ pub struct ValueLogCore { // TODO // guards our view of which files exist, which to be deleted, how many active iterators pub(crate) files_log: Arc>, - vlogs: Arc>>>>, // TODO It is not good idea that use raw lock for Arc>, it maybe lock AsyncRuntime thread. + vlogs: Arc>>>>, + // TODO It is not good idea that use raw lock for Arc>, it maybe lock AsyncRuntime thread. dirty_vlogs: Arc>>, // TODO why? // A refcount of iterators -- when this hits zero, we can delete the files_to_be_deleted. Why? num_active_iterators: AtomicI32, writable_log_offset: AtomicU32, - buf: ArcRW>>, + buf: TArcRW>>, opt: Options, kv: BoxKV, // Only allow one GC at a time. @@ -356,7 +461,7 @@ impl Default for ValueLogCore { dirty_vlogs: Arc::new(Default::default()), num_active_iterators: Default::default(), writable_log_offset: Default::default(), - buf: Arc::new(RwLock::new(BufWriter::new(vec![0u8; 0]))), + buf: Arc::new(tokio::sync::RwLock::new(BufWriter::new(vec![0u8; 0]))), opt: Default::default(), kv: BoxKV::new(ptr::null_mut()), garbage_ch: Channel::new(1), @@ -656,63 +761,65 @@ impl ValueLogCore { } // write is thread-unsafe by design and should not be called concurrently. - pub(crate) fn write(&self, reqs: Arc>) -> Result<()> { + pub(crate) async fn write(&self, reqs: Arc>) -> Result<()> { + defer! {info!("finished write value log");} let cur_vlog_file = self.pick_log_by_vlog_id(&self.max_fid.load(Ordering::Acquire)); - let to_disk = || -> Result<()> { - if self.buf.read().buffer().is_empty() { + + for req in reqs.iter() { + let req = req.get_req(); + for (idx, entry) in req.entries.iter().enumerate() { + if !self.opt.sync_writes + && entry.read().await.entry().value.len() < self.opt.value_threshold + { + // No need to write to value log. + // WARN: if mt not flush into disk but process abort, that will discard data(the data not write into vlog that WAL file) + req.ptrs.write().await[idx] = None; + continue; + } + + let mut ptr = ValuePointer::default(); + ptr.fid = cur_vlog_file.read().fid; + // Use the offset including buffer length so far. + ptr.offset = self.writable_log_offset.load(Ordering::Acquire) + + self.buf.read().await.buffer().len() as u32; + let mut buf = self.buf.write().await; + let mut entry = entry.write().await; + let mut entry = entry.mut_entry(); + entry.enc(buf.get_mut()).unwrap(); + } + } + { + if self.buf.read().await.buffer().is_empty() { return Ok(()); } info!( " Flushing {} blocks of total size: {}", reqs.len(), - self.buf.read().buffer().len() + self.buf.read().await.buffer().len() ); - let n = cur_vlog_file - .write() - .fd - .as_mut() - .unwrap() - .write(self.buf.read().buffer())?; + let mut buffer = self.buf.write().await; + let mut buffer = buffer.get_mut(); + let mut cur_vlog_file_wt = cur_vlog_file.write(); + let fp = cur_vlog_file_wt.fd.as_mut().unwrap(); + let n = fp.write(&buffer)?; // todo add metrics - info!("Done"); self.writable_log_offset .fetch_add(n as u32, Ordering::Release); - self.buf.write().get_mut().clear(); + buffer.clear(); if self.writable_log_offset.load(Ordering::Acquire) > self.opt.value_log_file_size as u32 { - cur_vlog_file - .write() - .done_writing(self.writable_log_offset.load(Ordering::Acquire))?; + cur_vlog_file_wt.done_writing(self.writable_log_offset.load(Ordering::Acquire))?; let new_id = self.max_fid.fetch_add(1, Ordering::Release); assert!(new_id < 1 << 16, "newid will overflow u16: {}", new_id); - *cur_vlog_file.write() = + *cur_vlog_file_wt = self.create_mmap_vlog_file(new_id, 2 * self.opt.value_log_file_size)?; } Ok(()) - }; - - for req in reqs.iter() { - for (idx, entry) in req.get_req().entries.read().iter().enumerate() { - if !self.opt.sync_writes && entry.borrow().value.len() < self.opt.value_threshold { - // No need to write to value log. - req.get_req().ptrs.lock()[idx] = None; - continue; - } - - let mut ptr = ValuePointer::default(); - ptr.fid = cur_vlog_file.read().fid; - // Use the offset including buffer length so far. - ptr.offset = self.writable_log_offset.load(Ordering::Acquire) - + self.buf.read().buffer().len() as u32; - let mut buf = self.buf.write(); - entry.borrow_mut().enc(&mut *buf)?; - } } - to_disk() } // rewrite the log_file @@ -743,7 +850,7 @@ impl ValueLogCore { } // TODO don't need decode vptr let entry = &mut entries[0].0; - let vs = kv.get(&entry.key); + let vs = kv._get(&entry.key); if let Err(ref err) = vs { if err.is_not_found() { info!( @@ -819,7 +926,7 @@ impl ValueLogCore { count ); info!("REWRITE: Removing fid: {}", lf.read().fid); - kv.batch_set(write_batch).await?; + kv.batch_set(write_batch).await; info!("REWRITE: Processed {} entries in total", count); info!("REWRITE: Removing fid: {}", lf.read().fid); let mut deleted_file_now = false; @@ -912,99 +1019,74 @@ impl ValueLogCore { } pub(crate) async fn wait_on_gc(&self, lc: Closer) { - defer! {lc.done()}; + defer! {lc.done()} lc.wait().await; // wait for lc to be closed. // Block any GC in progress to finish, and don't allow any more writes to runGC by filling up // the channel of size 1. self.garbage_ch.send(()).await.unwrap(); } -} -struct PickVlogsGuardsReadLock<'a> { - vlogs: lock_api::RwLockReadGuard< - 'a, - RawRwLock, - HashMap>>, - >, - fids: Vec, -} - -struct ValueLogIterator<'a> { - fd: &'a File, -} - -impl<'a> ValueLogIterator<'a> { - fn new(fd: &mut std::fs::File, offset: u32) -> Result> { - fd.seek(SeekFrom::Start(offset as u64))?; - Ok(ValueLogIterator { fd }) - } - - fn iterate(&mut self, log_file: &mut LogFile, offset: u32) -> Result<()> { - todo!() - } -} - -pub struct SafeValueLog { - gc_channel: Channel<()>, - value_log: Arc, -} - -impl SafeValueLog { - async fn trigger_gc(&self, gc_threshold: f64) -> Result<()> { - return match self.gc_channel.try_send(()) { + // only one gc worker + pub async fn trigger_gc(&self, gc_threshold: f64) -> Result<()> { + return match self.garbage_ch.try_send(()) { Ok(()) => { - let ok = self.do_run_gcc(gc_threshold).await; - self.gc_channel.recv().await.unwrap(); + let ok = self.do_run_gc(gc_threshold).await; + self.garbage_ch.recv().await.unwrap(); ok } Err(err) => Err(Error::ValueRejected), }; } - async fn do_run_gcc(&self, gc_threshold: f64) -> Result<()> { - let lf = self.value_log.pick_log().ok_or(Error::ValueNoRewrite)?; + pub async fn do_run_gc(&self, gc_threshold: f64) -> Result<()> { #[derive(Debug, Default)] struct Reason { total: f64, keep: f64, discard: f64, } - let mut reason: TArcMx = TArcMx::default(); - let mut window = 100.0; // lasted 100M + let mut reason = ArcMx::new(parking_lot::Mutex::new(Reason::default())); + let mut window = 100.0; // limit 100M for gc every time let mut count = 0; // Pick a random start point for the log. let mut skip_first_m = - thread_rng_n((self.value_log.opt.value_log_file_size / M) as u32) as f64 - window; + thread_rng_n((self.opt.value_log_file_size / M) as u32) as f64 - window; let mut skipped = 0.0; let mut start = SystemTime::now(); - // assert!(!self.value_log.kv.is_null()); - let err = lf - .clone() + // Random pick a vlog file for gc + let lf = self.pick_log().ok_or(Error::ValueNoRewrite)?; + let fid = lf.read().fid; + // Ennnnnnn + let vlog = unsafe { &*(self as *const ValueLogCore as *mut ValueLogCore) }; + lf.clone() .read() .iterate_by_offset(0, &mut |entry, vptr| { - let vlg = self.value_log.clone(); + let kv = vlog.get_kv(); let reason = reason.clone(); - let lfc = lf.clone(); Box::pin(async move { - let kv = vlg.get_kv(); - let mut reason = reason.lock().await; + let mut reason = reason.lock(); let esz = vptr.len as f64 / (1 << 20) as f64; // in MBs, +4 for the CAS stuff. skipped += esz; if skipped < skip_first_m { + // Skip return Ok(true); } count += 1; + // TODO confiure if count % 100 == 0 { tokio::time::sleep(Duration::from_millis(1)).await; } reason.total += esz; if reason.total > window { - return Err("stop iteration".into()); + // return Err(Error::StopGC); + return Ok(false); } if start.elapsed().unwrap().as_secs() > 10 { - return Err("stop iteration".into()); + // return Err(Error::StopGC); + return Ok(false); } - let vs = kv.get(&entry.key)?; + // Get the late value + let vs = kv._get(&entry.key)?; if (vs.meta & MetaBit::BIT_DELETE.bits()) > 0 { // Key has been deleted. Discard. reason.discard += esz; @@ -1019,22 +1101,25 @@ impl SafeValueLog { assert!(!vs.value.is_empty()); let mut vptr = vptr.clone(); // TODO avoid copy vptr.dec(&mut io::Cursor::new(vs.value))?; - if vptr.fid > lfc.read().fid { + if vptr.fid > fid { // Value is present in a later log. Discard. reason.discard += esz; return Ok(true); } + if vptr.offset > entry.offset { // Value is present in a later offset, but in the same log. reason.discard += esz; return Ok(true); } - if vptr.fid == lfc.read().fid && vptr.offset == entry.offset { + + if vptr.fid == fid && vptr.offset == entry.offset { // This is still the active entry, This would need to be rewritten. reason.keep += esz; } else { + // TODO Maybe abort gc process, it should be happen info!("Reason={:?}", reason); - let err = vlg.read_value_bytes(&vptr, |buf| { + let err = vlog.read_value_bytes(&vptr, |buf| { let mut unexpect_entry = Entry::default(); unexpect_entry.dec(&mut io::Cursor::new(buf))?; unexpect_entry.offset = vptr.offset; @@ -1051,136 +1136,28 @@ impl SafeValueLog { Ok(true) }) }) - .await; - - if err.is_err() { - info!( - "Error while iterating for RunGC: {}", - err.as_ref().unwrap_err() - ); - return err; - } - - info!("Fid: {} Data status={:?}", lf.read().fid, reason); - if reason.lock().await.total < 10.0 - || reason.lock().await.discard < gc_threshold * reason.lock().await.total - { + .await?; + let reason = reason.lock(); + info!("Fid: {} Data status={:?}", fid, reason); + if reason.total < 10.0 || reason.discard < gc_threshold * reason.total { info!("Skipping GC on fid: {}", lf.read().fid); return Err(Error::ValueNoRewrite); } info!("REWRITING VLOG {}", lf.read().fid); - self.value_log.rewrite(lf, self.value_log.get_kv()).await?; + self.rewrite(lf, self.get_kv()).await?; Ok(()) } } -#[test] -fn it() { - use parking_lot::*; - struct Flock { - df: RwLock>>, - age: u32, - } - // impl Flock { - // fn get_df( - // &self, - // ) -> std::result::Result< - // lock_api::MappedRwLockReadGuard<'_, RawRwLock, String>, - // lock_api::RwLockReadGuard<'_, RawRwLock, HashMap>, - // > { - // RwLockReadGuard::try_map(self.df.read(), |df| df.get(&0)) - // } - // - // fn get_mut( - // &self, - // idx: u32, - // ) -> std::result::Result< - // lock_api::MappedRwLockWriteGuard<'_, RawRwLock, String>, - // lock_api::RwLockWriteGuard<'_, RawRwLock, HashMap>, - // > { - // RwLockWriteGuard::try_map(self.df.write(), |df| df.get_mut(&idx)) - // } - // } - - let mut flock = Flock { - df: RwLock::new(HashMap::new()), - age: 19, - }; - { - flock - .df - .write() - .insert(0, RwLock::new("foobat".to_string())); - flock.df.write().insert(1, RwLock::new("ok!".to_string())); - } - // let lock1 = flock.df.write().get(&0).as_mut().unwrap(); - // let lock2 = flock.df.write().get(&1).as_mut().unwrap(); - // flock.df.write().insert(3, RwLock::new("ok!".to_string())); - // let value = RwLockReadGuard::try_map(lock1.read(), |df| Some(df)); - // println!("WHat??? {:?}", value); -} - -#[tokio::test] -async fn lock1() { - let req: RwLock>> = RwLock::new(Vec::new()); - - tokio::spawn(async move { - let _a = &req.write()[0]; - }); +struct PickVlogsGuardsReadLock<'a> { + vlogs: lock_api::RwLockReadGuard< + 'a, + RawRwLock, + HashMap>>, + >, + fids: Vec, } -#[tokio::test] -async fn lock() { - use parking_lot::*; - - #[derive(Debug)] - struct FileLog {} - - #[derive(Debug)] - struct FileLogProxy { - files: HashMap>, - } - - impl FileLogProxy { - fn get_file( - &self, - idx: u32, - ) -> parking_lot::lock_api::RwLockReadGuard<'_, RawRwLock, FileLog> { - let flog = self.files.get(&idx).unwrap(); - let c = flog.read(); - c - } - - fn get_mut_file( - &self, - idx: u32, - ) -> std::result::Result< - parking_lot::lock_api::MappedRwLockWriteGuard<'_, RawRwLock, FileLog>, - parking_lot::lock_api::RwLockWriteGuard<'_, RawRwLock, FileLog>, - > { - let flog = self.files.get(&idx).unwrap(); - RwLockWriteGuard::try_map(flog.write(), |df| Some(df)) - } - } - - struct ValueLog { - df: RwLock, - age: u32, - } - - impl ValueLog { - // fn max_vlog_rl( - // &self, - // ) -> parking_lot::lock_api::RwLockReadGuard<'_, RawRwLock, FileLog> { - // let rl = self.rl(); - // let vlog = rl.get_file(0); - // vlog - // } - - // fn rl(&self) -> parking_lot::lock_api::RwLockReadGuard<'_, RawRwLock, FileLog> { - // let df = self.df.read().get_file(0); - // df - // } - } -} +#[test] +fn it() {} diff --git a/src/y/mod.rs b/src/y/mod.rs index a45aab7..5c49257 100644 --- a/src/y/mod.rs +++ b/src/y/mod.rs @@ -79,6 +79,10 @@ pub enum Error { ///////////////////////////////// #[error("Not found")] NotFound, + //////////////////////////////// + // GC + #[error("Stop iteration")] + StopGC, } impl Default for Error {