From 7381bd11ef38864bf18ef28da85298b957fe8f6d Mon Sep 17 00:00:00 2001 From: Rg Date: Mon, 23 Oct 2023 01:32:07 +0800 Subject: [PATCH 01/18] :dog: --- Cargo.toml | 2 ++ examples/badger.rs | 45 ++++++++++++++++++++++++++++++++------------- src/iterator.rs | 10 +++++----- src/kv.rs | 31 +++++++++++++++++++------------ src/kv_test.rs | 18 +++++++++--------- 5 files changed, 67 insertions(+), 39 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8a626bb..5476cf9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,8 @@ async-stream = "0.3.5" futures-core = "0.3.28" backtrace-on-stack-overflow = "0.3.0" protobuf = { version = "3.0.0-alpha.2", features = ["with-bytes"] } +clap = { version = "4.4.6", features = ["derive"] } +structopt = "0.3.26" [dev-dependencies] tracing-subscriber = "0.3.17" tracing-log = "0.1.3" diff --git a/examples/badger.rs b/examples/badger.rs index a787c90..8453593 100644 --- a/examples/badger.rs +++ b/examples/badger.rs @@ -1,20 +1,39 @@ +use std::path::PathBuf; +use structopt::StructOpt; + + +#[derive(StructOpt)] +#[structopt(about = "BadgerDB demo")] +enum Opt { + BackUp, +} + + #[tokio::main] async fn main() { + let env = tracing_subscriber::EnvFilter::from_default_env(); tracing_subscriber::FmtSubscriber::builder() .with_env_filter(env) .try_init() .unwrap(); - let opt = badger_rs::Options::default(); - let kv = badger_rs::KV::open(opt).await.unwrap(); - kv.set( - b"hello word".to_vec(), - b">>>>>I LOVE YOU!<<<<<".to_vec(), - 0x0, - ) - .await - .unwrap(); - - let got = kv.get(b"hello word").await.unwrap(); - println!("{}", String::from_utf8_lossy(&got)); -} + + + let opt = Opt::from_args(); + match opt { + Opt::BackUp => { + let opt = badger_rs::Options::default(); + let kv = badger_rs::DB::open(opt).await.unwrap(); + kv.set( + b"hello word".to_vec(), + b">>>>>I LOVE YOU!<<<<<".to_vec(), + 0x0, + ) + .await + .unwrap(); + + let got = kv.get(b"hello word").await.unwrap(); + println!("{}", String::from_utf8_lossy(&got)); + } + } +} \ No newline at end of file diff --git a/src/iterator.rs b/src/iterator.rs index cb83cdd..6cbb0d1 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -1,7 +1,7 @@ use crate::iterator::PreFetchStatus::Prefetched; use crate::kv::_BADGER_PREFIX; use crate::types::{ArcRW, Channel, Closer, TArcMx, TArcRW}; -use crate::{hex_str, ValueStruct, KV}; +use crate::{hex_str, ValueStruct, DB}; use crate::{ value_log::{MetaBit, ValuePointer}, Decode, MergeIterator, Result, Xiterator, EMPTY_SLICE, @@ -86,7 +86,7 @@ impl KVItem { #[derive(Clone)] pub(crate) struct KVItemInner { status: Arc>, - kv: KV, + kv: DB, key: Vec, // TODO, Opz memory vptr: Vec, @@ -121,7 +121,7 @@ impl Debug for KVItemInner { } impl KVItemInner { - pub(crate) fn new(key: Vec, value: ValueStruct, kv: KV) -> KVItemInner { + pub(crate) fn new(key: Vec, value: ValueStruct, kv: DB) -> KVItemInner { Self { status: Arc::new(Atomic::new(PreFetchStatus::Empty)), kv, @@ -287,7 +287,7 @@ pub(crate) const DEF_ITERATOR_OPTIONS: IteratorOptions = IteratorOptions { /// | | | /// IteratorExt reference pub struct IteratorExt { - kv: KV, + kv: DB, itr: MergeIterator, opt: IteratorOptions, item: ArcRW>, @@ -329,7 +329,7 @@ pub struct IteratorExt { // } impl IteratorExt { - pub(crate) fn new(kv: KV, itr: MergeIterator, opt: IteratorOptions) -> IteratorExt { + pub(crate) fn new(kv: DB, itr: MergeIterator, opt: IteratorOptions) -> IteratorExt { IteratorExt { kv, opt, diff --git a/src/kv.rs b/src/kv.rs index e699dba..086300e 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -684,13 +684,13 @@ pub type WeakKV = XWeak; /// DB handle /// ``` -/// use badger_rs::{Options, KV}; +/// use badger_rs::{Options, DB}; /// use badger_rs::IteratorOptions; /// /// #[tokio::main] /// /// pub async fn main() { -/// let kv = KV::open(Options::default()).await.unwrap(); +/// let kv = DB::open(Options::default()).await.unwrap(); /// kv.set(b"foo".to_vec(), b"bar".to_vec(), 0x0).await.unwrap(); /// let value = kv.get(b"foo").await.unwrap(); /// assert_eq!(&value, b"bar"); @@ -707,11 +707,11 @@ pub type WeakKV = XWeak; ///} /// ``` #[derive(Clone)] -pub struct KV { +pub struct DB { inner: XArc, } -impl Deref for KV { +impl Deref for DB { type Target = KVCore; fn deref(&self) -> &Self::Target { @@ -719,15 +719,15 @@ impl Deref for KV { } } -impl fmt::Debug for KV { +impl fmt::Debug for DB { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("KV").finish() } } -impl KV { +impl DB { /// Async open a KV db with Options - pub async fn open(mut opt: Options) -> Result { + pub async fn open(mut opt: Options) -> Result { opt.max_batch_size = (15 * opt.max_table_size) / 100; opt.max_batch_count = 2 * opt.max_batch_size / Node::align_size() as u64; create_dir_all(opt.dir.as_str()).await?; @@ -805,7 +805,7 @@ impl KV { } out.vlog.replace(Arc::new(vlog)); - let xout = KV::new(XArc::new(out)); + let xout = DB::new(XArc::new(out)); // update size { @@ -1261,7 +1261,14 @@ impl KV { Ok(()) } - pub async fn backup(&self, mut wt: W) -> Result<()> + /// Backup dumps a protobuf-encoded list of all entries in the database into the + /// given writer, that are newer than the specified version. It returns a + /// timestamp indicating when the entries were dumped which can be passed into a + /// later invocation to generate an incremental dump, of entries that have been + /// added/modified since the last invocation of DB.Backup() + /// + /// This can be used to backup the data in a database at a given point in time. + pub async fn backup(&self, mut wt: W, since: u64) -> Result<()> where W: Write, { @@ -1281,7 +1288,7 @@ impl KV { } } -impl KV { +impl DB { // pub(crate) async fn new_std_iterator( // &self, // opt: IteratorOptions, @@ -1331,8 +1338,8 @@ impl KV { self.inner.clone() } - pub(crate) fn new(inner: XArc) -> KV { - KV { inner } + pub(crate) fn new(inner: XArc) -> DB { + DB { inner } } pub(crate) fn to_ref(&self) -> &KVCore { diff --git a/src/kv_test.rs b/src/kv_test.rs index 56bb296..f4778e9 100644 --- a/src/kv_test.rs +++ b/src/kv_test.rs @@ -17,7 +17,7 @@ use crate::test_util::{push_log, remove_push_log, tracing_log}; use crate::types::{TArcMx, XArc}; use crate::value_log::{Entry, MetaBit, MAX_KEY_SIZE}; use crate::y::hex_str; -use crate::{kv::KVCore, options::Options, Error, KV}; +use crate::{kv::KVCore, options::Options, Error, DB}; fn get_test_option(dir: &str) -> Options { let mut opt = Options::default(); @@ -33,7 +33,7 @@ async fn t_1_write() { use crate::test_util::{random_tmp_dir, tracing_log}; tracing_log(); let dir = random_tmp_dir(); - let kv = KV::open(get_test_option(&dir)).await; + let kv = DB::open(get_test_option(&dir)).await; let kv = kv.unwrap(); let res = kv.set(b"hello".to_vec(), b"word".to_vec(), 10).await; assert!(res.is_ok()); @@ -47,7 +47,7 @@ async fn t_batch_write() { use crate::test_util::{random_tmp_dir, tracing_log}; tracing_log(); let dir = random_tmp_dir(); - let kv = KV::open(get_test_option(&dir)).await; + let kv = DB::open(get_test_option(&dir)).await; let kv = kv.unwrap(); let n = 50000; let mut batch = vec![]; @@ -136,7 +136,7 @@ async fn t_concurrent_write() { use crate::test_util::{random_tmp_dir, tracing_log}; tracing_log(); let dir = random_tmp_dir(); - let kv = KV::open(get_test_option(&dir)).await; + let kv = DB::open(get_test_option(&dir)).await; let kv = kv.unwrap(); let mut wg = awaitgroup::WaitGroup::new(); let n = 20; @@ -674,7 +674,7 @@ async fn t_delete_without_sync_write() { drop(kv); // Reopen kv, it should failed { - let kv = KV::open(opt).await.unwrap(); + let kv = DB::open(opt).await.unwrap(); let got = kv.get_with_ext(&key).await; assert!(got.unwrap_err().is_not_found()); } @@ -700,7 +700,7 @@ async fn t_kv_set_if_absent() { async fn t_kv_pid_file() { tracing_log(); let kv1 = build_kv().await; - let kv2 = KV::open(kv1.opt.clone()).await; + let kv2 = DB::open(kv1.opt.clone()).await; let err = kv2.unwrap_err(); assert!(err .to_string() @@ -848,7 +848,7 @@ async fn t_kv_dump() { .create(true) .open(back_tmp.clone()) .unwrap(); - kv.backup(fp).await.unwrap(); + kv.backup(fp, 0).await.unwrap(); kv.close().await.unwrap(); info!("backup: {:?}", back_tmp); } @@ -889,11 +889,11 @@ async fn t_kv_dump() { // ); // } -async fn build_kv() -> KV { +async fn build_kv() -> DB { use crate::test_util::random_tmp_dir; tracing_log(); let dir = random_tmp_dir(); - let kv = KV::open(get_test_option(&dir)).await; + let kv = DB::open(get_test_option(&dir)).await; let kv = kv.unwrap(); kv } From 1b6f146e0d17c6374f3cf9bb87a4a1567c2c9976 Mon Sep 17 00:00:00 2001 From: Rg Date: Mon, 23 Oct 2023 15:20:57 +0800 Subject: [PATCH 02/18] :dog: --- src/table/table.rs | 34 ++++------------------------------ 1 file changed, 4 insertions(+), 30 deletions(-) diff --git a/src/table/table.rs b/src/table/table.rs index e6daef4..23c9488 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -51,7 +51,6 @@ impl Display for KeyOffset { } pub type Table = XArc; -pub type WeakTable = XWeak; impl From for Table { fn from(value: TableCore) -> Self { @@ -164,38 +163,18 @@ impl TableCore { let mut tc = table_ref.to_inner().unwrap(); tc.biggest = biggest; tc.smallest = smallest; - // info!("open table ==> {}", tc); + Ok(tc) } // increments the refcount (having to do with whether the file should be deleted) pub(crate) fn incr_ref(&self) { - use std::backtrace::Backtrace; - let count = self._ref.fetch_add(1, Ordering::Release); - let buf = format!( - "incr {} table count {} => {}", - self.id, - count, - self.get_ref() - ); - // info!("{}", buf); - // - // info!( - // "BackTrace at table incr reference: {}", - // Backtrace::force_capture() - // ); - //push_log(buf.as_bytes(), false); + //use std::backtrace::Backtrace; + self._ref.fetch_add(1, Ordering::Release); } // decrements the refcount and possibly deletes the table pub(crate) fn decr_ref(&self) { - let count = self._ref.fetch_sub(1, Ordering::Release); - let buf = format!( - "decr {} table count {} => {}", - self.id, - count, - self.get_ref() - ); - //push_log(buf.as_bytes(), false); + self._ref.fetch_sub(1, Ordering::Release); } pub(crate) fn get_ref(&self) -> i32 { @@ -385,11 +364,6 @@ impl Drop for TableCore { impl Display for TableCore { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let index_str = self - .block_index - .iter() - .map(|x| format!("{}", x)) - .collect::>(); let smallest = hex_str(self.smallest()); let biggest = hex_str(self.biggest()); f.debug_struct("Table") From ec5e9a4dd5b4b55826e4e01011386b605408b228 Mon Sep 17 00:00:00 2001 From: Rg Date: Mon, 23 Oct 2023 19:12:51 +0800 Subject: [PATCH 03/18] :dog: --- Cargo.toml | 1 + src/iterator.rs | 12 ++++---- src/lib.rs | 1 + src/skl/skip.rs | 6 +++- src/table/builder.rs | 4 +-- src/transition.rs | 67 ++++++++++++++++++++++++++++++++++++++++++++ src/y/mod.rs | 60 ++++++++++++++++++++++++++++++++++++--- 7 files changed, 138 insertions(+), 13 deletions(-) create mode 100644 src/transition.rs diff --git a/Cargo.toml b/Cargo.toml index 5476cf9..ab1b5ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ backtrace-on-stack-overflow = "0.3.0" protobuf = { version = "3.0.0-alpha.2", features = ["with-bytes"] } clap = { version = "4.4.6", features = ["derive"] } structopt = "0.3.26" +farmhash = "1.1.5" [dev-dependencies] tracing-subscriber = "0.3.17" tracing-log = "0.1.3" diff --git a/src/iterator.rs b/src/iterator.rs index 6cbb0d1..60a449f 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -85,14 +85,14 @@ impl KVItem { // iterator.next() is called. #[derive(Clone)] pub(crate) struct KVItemInner { - status: Arc>, + pub(crate) status: Arc>, kv: DB, - key: Vec, + pub(crate) key: Vec, // TODO, Opz memory - vptr: Vec, - value: TArcMx>, - meta: u8, - user_meta: u8, + pub(crate) vptr: Vec, + pub(crate) value: TArcMx>, + pub(crate) meta: u8, + pub(crate) user_meta: u8, cas_counter: Arc, wg: Closer, err: Result<()>, diff --git a/src/lib.rs b/src/lib.rs index 34dc6c8..9ec82c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,6 +56,7 @@ mod st_manager; #[cfg(test)] mod test_util; mod backup; +mod transition; pub use iterator::*; pub use kv::*; diff --git a/src/skl/skip.rs b/src/skl/skip.rs index e44c433..e0e3a3b 100644 --- a/src/skl/skip.rs +++ b/src/skl/skip.rs @@ -1,6 +1,6 @@ use crate::skl::{Cursor, HEIGHT_INCREASE, MAX_HEIGHT}; use crate::table::iterator::IteratorItem; -use crate::y::ValueStruct; +use crate::y::{same_key_ignore_version, ValueStruct}; use crate::{Allocate, Xiterator}; use log::{info, warn}; @@ -375,6 +375,10 @@ impl SkipList { } if let Some(node) = node { let (offset, size) = node.get_value_offset(); + // diff key has same prefix + if !same_key_ignore_version(key, self.arena.get_key(node.key_offset, node.key_size)) { + return None; + } let value = self.arena.get_val(offset, size); #[cfg(test)] diff --git a/src/table/builder.rs b/src/table/builder.rs index af30a65..443b7fb 100644 --- a/src/table/builder.rs +++ b/src/table/builder.rs @@ -231,12 +231,12 @@ impl Default for Builder { fn default() -> Self { Self { counter: 0, - buf: Cursor::new(Vec::with_capacity(64 << 20)), + buf: Cursor::new(Vec::with_capacity(1 << 20)), base_key: vec![], base_offset: 0, restarts: vec![], prev_offset: u32::MAX, - key_buf: Cursor::new(Vec::with_capacity(32 << 20)), + key_buf: Cursor::new(Vec::with_capacity(1 << 20)), key_count: 0, } } diff --git a/src/transition.rs b/src/transition.rs new file mode 100644 index 0000000..84c68c2 --- /dev/null +++ b/src/transition.rs @@ -0,0 +1,67 @@ +use std::collections::HashMap; +use std::sync::Arc; +use crate::{Result, KVItem, WeakKV, ValueStruct, DB}; +use crate::iterator::{KVItemInner, PreFetchStatus}; +use crate::value_log::{Entry, MetaBit}; +use crate::y::compare_key; + +pub struct TxN { + read_ts: u64, + // update is used to conditionally keep track of reads. + update: bool, + // contains fingerprints of keys read. + reads: Arc>>, + // contains fingerprints of keys written. + writes: Arc>>, + // cache stores any writes done by txn. + pending_writes: Arc, Entry>>>, + + kv: WeakKV, +} + +impl TxN { + pub fn set(&self, key: Vec, value: Vec, user_meta: u8) { + let fp = farmhash::fingerprint64(&key); // Avoid dealing with byte arrays + self.writes.write().map(|mut writes| writes.push(fp)).unwrap(); + let entry = Entry::default().key(key.clone()).value(value).user_meta(user_meta); + self.pending_writes.write().map(|mut writes| writes.insert(key, entry)).unwrap(); + } + + pub fn delete(&self, key: &[u8]) { + let fp = farmhash::fingerprint64(&key); + self.writes.write().map(|mut writes| writes.push(fp)).unwrap(); + let entry = Entry::default().key(key.to_vec()).meta(MetaBit::BIT_DELETE.bits()); + self.pending_writes.write().map(|mut writes| writes.insert(key.to_vec(), entry)).unwrap(); + } + + pub async fn get(&self, key: &[u8]) -> Result { + let kv_db = self.kv.upgrade().unwrap(); + let kv_db = DB::new(kv_db); + let mut item = KVItem::from(KVItemInner::new(vec![], ValueStruct::default(), kv_db.clone())); + if self.update { + let pending_writes = self.pending_writes.write().unwrap(); + if let Some(e) = pending_writes.get(key) && &e.key == key { + { + let mut wl = item.wl().await; + wl.meta = e.meta; + *wl.value.lock().await = e.value.clone(); + wl.user_meta = e.user_meta; + wl.key = e.key.clone(); + wl.status.store(PreFetchStatus::Prefetched, std::sync::atomic::Ordering::SeqCst); + } + // We probably don't need to set KV on item here. + return Ok(item); + } + + let fp = farmhash::fingerprint64(key); + self.reads.write().map(|mut writes| writes.push(fp)).unwrap(); + } + + let seek = crate::y::key_with_ts(key, self.read_ts); + kv_db.get_with_ext(&seek).await + } + + pub async fn commit(&self) { + + } +} diff --git a/src/y/mod.rs b/src/y/mod.rs index 99fa580..95713cc 100644 --- a/src/y/mod.rs +++ b/src/y/mod.rs @@ -15,10 +15,11 @@ use std::collections::hash_map::DefaultHasher; use std::fs::{File, OpenOptions}; use std::hash::Hasher; -use std::io::{ErrorKind, Write}; +use std::io::{Cursor, ErrorKind, Write}; use std::backtrace::Backtrace; use std::{array, cmp, io}; +use byteorder::{WriteBytesExt, BigEndian, ReadBytesExt}; use thiserror::Error; use tracing::info; @@ -268,7 +269,7 @@ pub(crate) fn parallel_load_block_key(fp: File, offsets: Vec) -> Vec String { String::from_utf8(buf.to_vec()).unwrap_or_else(|_| "Sorry, Hex String Failed!!!".to_string()) } +// Generates a new key by appending ts to key. +pub(crate) fn key_with_ts(key: &[u8], ts: u64) -> Vec { + let mut out = vec![0u8; key.len() + 8]; + out.copy_from_slice(key); + out.write_u64::(u64::MAX - ts).unwrap(); + out +} + +#[inline(always)] +pub(crate) fn parse_ts(out: &[u8]) -> u64 { + if out.len() <= 8 { + return 0; + } + let mut cursor = Cursor::new(out); + u64::MAX - cursor.read_u64::().unwrap() +} + +#[inline(always)] +pub(crate) fn parse_key(out: &[u8]) -> &[u8] { + if out.is_empty() { + return out; + } + &out[..(out.len() - 8)] +} + +// checks for key equality ignoring the version timestamp suffix. +pub(crate) fn same_key_ignore_version(src: &[u8], dst: &[u8]) -> bool { + if src.len() != dst.len() { + return false; + } + let key_src = parse_key(src); + let key_dst = parse_key(dst); + key_src == key_dst +} + + +// compare_keys checks the key without timestamp and checks the timestamp if keyNoTs +// is same. +// a would be sorted higher than aa if we use bytes.compare +// All keys should have timestamp. +pub(crate) fn compare_key(key1: &[u8], key2: &[u8]) -> Ordering { + assert!(key1.len() > 8); + assert!(key2.len() > 8); + return match parse_key(&key1).cmp(&parse_key(key2)) { + Ordering::Equal => { + parse_ts(key1).cmp(&parse_ts(key2)) + } + other => other, + }; +} + #[cfg(any(target_os = "macos", target_os = "linux"))] #[test] fn dsync() { @@ -375,8 +427,8 @@ fn dsync() { /// find a value in array with binary search pub fn binary_search(array: &[T], f: F) -> Option -where - F: Fn(&T) -> Ordering, + where + F: Fn(&T) -> Ordering, { let mut low = 0; let mut high = array.len() - 1; From 41e6b9faab3e40ac977eb3612794ae271037d13b Mon Sep 17 00:00:00 2001 From: Rg Date: Wed, 25 Oct 2023 01:24:16 +0800 Subject: [PATCH 04/18] hello, :pig: --- src/transition.rs | 147 ++++++++++++++++++++++++++++++++++++++++++---- src/y/mod.rs | 4 ++ 2 files changed, 138 insertions(+), 13 deletions(-) diff --git a/src/transition.rs b/src/transition.rs index 84c68c2..5721c2f 100644 --- a/src/transition.rs +++ b/src/transition.rs @@ -1,10 +1,96 @@ -use std::collections::HashMap; -use std::sync::Arc; -use crate::{Result, KVItem, WeakKV, ValueStruct, DB}; use crate::iterator::{KVItemInner, PreFetchStatus}; use crate::value_log::{Entry, MetaBit}; use crate::y::compare_key; +use crate::{Error, KVItem, Result, ValueStruct, WeakKV, DB}; +use drop_cell::defer; +use parking_lot::RwLock; +use std::collections::{BinaryHeap, HashMap, HashSet}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +struct GlobalTxNStateInner { + lock: Arc>, + cur_read: AtomicU64, + next_commit: AtomicU64, + // These two structures are used to figure out when a commit is done. The minimum done commit is + // used to update cur_read. + commit_mark: BinaryHeap, + pending_commits: HashSet, + + // commits stores a key fingerprint and latest commit counter for it. + commits: HashMap, +} + +impl GlobalTxNStateInner { + fn read_ts(&self) -> u64 { + self.cur_read.load(Ordering::SeqCst) + } + + // must be called while having a lock. + fn has_conflict(&self, tx: &TxN) -> bool { + let tx_reads = tx.reads.write().unwrap(); + if tx_reads.is_empty() { + return false; + } + for ro in tx_reads.iter() { + if let Some(ts) = self.commits.get(ro) && *ts > tx.read_ts { + return true; + } + } + false + } + + fn new_commit_ts(&mut self, tx: &TxN) -> u64 { + let _lock = self.lock.write(); + + if self.has_conflict(tx) { + return 0; + } + + let ts = self.next_commit.load(Ordering::SeqCst); + let tx_writes = tx.writes.write().unwrap(); + for w in tx_writes.iter() { + // Update the commit_ts. + self.commits.insert(*w, ts); + } + self.commit_mark.push(ts); + if self.pending_commits.contains(&ts) { + panic!("We shouldn't have the commit ts: {}", ts); + } + + self.pending_commits.insert(ts); + self.next_commit.fetch_add(1, Ordering::SeqCst); + ts + } + + fn done_commit(&mut self, cts: u64) { + let _lock = self.lock.write(); + if !self.pending_commits.remove(&cts) { + panic!("We should already have the commit ts: {}", cts); + } + + let mut min = 0; + while !self.commit_mark.is_empty() { + let ts = self.commit_mark.peek().unwrap(); + if self.pending_commits.contains(ts) { + // Still waiting for a txn to commit. + break; + } + min = *ts; + self.commit_mark.pop(); + } + + if min == 0 { + return; + } + + self.cur_read.store(min, Ordering::SeqCst); + self.next_commit.store(min + 1, Ordering::SeqCst); + } +} + +#[derive(Clone)] pub struct TxN { read_ts: u64, // update is used to conditionally keep track of reads. @@ -17,27 +103,49 @@ pub struct TxN { pending_writes: Arc, Entry>>>, kv: WeakKV, + gs: Arc>, } impl TxN { pub fn set(&self, key: Vec, value: Vec, user_meta: u8) { let fp = farmhash::fingerprint64(&key); // Avoid dealing with byte arrays - self.writes.write().map(|mut writes| writes.push(fp)).unwrap(); - let entry = Entry::default().key(key.clone()).value(value).user_meta(user_meta); - self.pending_writes.write().map(|mut writes| writes.insert(key, entry)).unwrap(); + self.writes + .write() + .map(|mut writes| writes.push(fp)) + .unwrap(); + let entry = Entry::default() + .key(key.clone()) + .value(value) + .user_meta(user_meta); + self.pending_writes + .write() + .map(|mut writes| writes.insert(key, entry)) + .unwrap(); } pub fn delete(&self, key: &[u8]) { let fp = farmhash::fingerprint64(&key); - self.writes.write().map(|mut writes| writes.push(fp)).unwrap(); - let entry = Entry::default().key(key.to_vec()).meta(MetaBit::BIT_DELETE.bits()); - self.pending_writes.write().map(|mut writes| writes.insert(key.to_vec(), entry)).unwrap(); + self.writes + .write() + .map(|mut writes| writes.push(fp)) + .unwrap(); + let entry = Entry::default() + .key(key.to_vec()) + .meta(MetaBit::BIT_DELETE.bits()); + self.pending_writes + .write() + .map(|mut writes| writes.insert(key.to_vec(), entry)) + .unwrap(); } pub async fn get(&self, key: &[u8]) -> Result { let kv_db = self.kv.upgrade().unwrap(); let kv_db = DB::new(kv_db); - let mut item = KVItem::from(KVItemInner::new(vec![], ValueStruct::default(), kv_db.clone())); + let item = KVItem::from(KVItemInner::new( + vec![], + ValueStruct::default(), + kv_db.clone(), + )); if self.update { let pending_writes = self.pending_writes.write().unwrap(); if let Some(e) = pending_writes.get(key) && &e.key == key { @@ -54,14 +162,27 @@ impl TxN { } let fp = farmhash::fingerprint64(key); - self.reads.write().map(|mut writes| writes.push(fp)).unwrap(); + self.reads + .write() + .map(|mut writes| writes.push(fp)) + .unwrap(); } let seek = crate::y::key_with_ts(key, self.read_ts); kv_db.get_with_ext(&seek).await } - pub async fn commit(&self) { - + pub async fn commit(&self) -> Result<()> { + if self.writes.write().unwrap().is_empty() { + return Ok(()); // Ready only translation. + } + let commit_ts = self.gs.write().unwrap().new_commit_ts(&self); + if commit_ts == 0 { + return Err(Error::TxCommitConflict); + } + Ok(()) } } + +#[test] +fn it() {} diff --git a/src/y/mod.rs b/src/y/mod.rs index 95713cc..3a26ed0 100644 --- a/src/y/mod.rs +++ b/src/y/mod.rs @@ -93,6 +93,10 @@ pub enum Error { // GC #[error("Stop iteration")] StopGC, + + //////////////////////////////// + #[error("Transaction Conflict. Please retry.")] + TxCommitConflict, } impl Default for Error { From 1ecdb8b0763884f4b62b17961562e012a10c920d Mon Sep 17 00:00:00 2001 From: Rg Date: Tue, 31 Oct 2023 17:20:53 +0800 Subject: [PATCH 05/18] update --- Cargo.toml | 16 ++++++++-------- src/iterator.rs | 14 +++++++------- src/kv.rs | 8 ++++---- src/lib.rs | 1 + src/transition.rs | 2 +- src/value_log.rs | 15 ++++----------- 6 files changed, 25 insertions(+), 31 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ab1b5ef..4a24389 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,8 +14,8 @@ tokio = { version = "1.29.1", features = ["full", "tracing"] } byteorder = "1.4.3" rand = "0.8.5" maligned = "0.2.1" -atomic = "0.5.3" -tabled = { version = "0.12.2", features = ["ansi-str", "color"] } +atomic = "0.6.0" +tabled = { version = "0.14.0", features = ["ansi-str", "color"] } memmap = "0.7.0" bytes = "1.4.0" bloom = "0.3.2" @@ -30,7 +30,7 @@ parking_lot = "0.12.1" bitflags = "2.3.3" libc = "0.2.147" log = { version = "0.4.19", features = ["kv_unstable", "kv_unstable_serde", "kv_unstable_sval"] } -async-channel = "1.9.0" +async-channel = "2.0.0" file-guard = "0.1.0" fs2 = "0.4.3" awaitgroup = "0.7.0" @@ -45,13 +45,13 @@ eieio = "1.0.0" either = "1.8.1" enum-unitary = "0.5.0" atom_box = "0.1.2" -console-subscriber = "0.1.10" +console-subscriber = "0.2.0" uuid = { version = "1.4.1", features = ["v5", "v4"] } winapi = "0.3.9" itertools = "0.11.0" -tokio-metrics = "0.2.2" +tokio-metrics = "0.3.1" metrics = "0.21.1" -metrics-prometheus = "0.4.1" +metrics-prometheus = "0.5.0" prometheus = "0.13.3" lazy_static = "1.4.0" getset = "0.1.2" @@ -65,12 +65,12 @@ structopt = "0.3.26" farmhash = "1.1.5" [dev-dependencies] tracing-subscriber = "0.3.17" -tracing-log = "0.1.3" +tracing-log = "0.2.0" chrono = "0.4.26" env_logger = "0.10.0" console_log = { version = "1.0.0", features = ["color"] } itertools = "0.11.0" -tokio-metrics = { version = "0.2.2", default-features = false } +tokio-metrics = { version = "0.3.1", default-features = false } tokio = { version = "1.29.1", features = ["full", "rt", "time", "macros", "test-util"] } criterion = { version = "0.5.1", features = ["tokio"] } diff --git a/src/iterator.rs b/src/iterator.rs index 60a449f..cbc9e57 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -9,10 +9,10 @@ use crate::{ use atomic::Atomic; -use std::fmt::{Debug, Display, Formatter, Pointer}; +use std::fmt::{Debug, Display, Formatter}; use std::future::Future; -use std::pin::{pin, Pin}; +use std::pin::Pin; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -85,7 +85,7 @@ impl KVItem { // iterator.next() is called. #[derive(Clone)] pub(crate) struct KVItemInner { - pub(crate) status: Arc>, + pub(crate) status: Arc>, kv: DB, pub(crate) key: Vec, // TODO, Opz memory @@ -123,7 +123,7 @@ impl Debug for KVItemInner { impl KVItemInner { pub(crate) fn new(key: Vec, value: ValueStruct, kv: DB) -> KVItemInner { Self { - status: Arc::new(Atomic::new(PreFetchStatus::Empty)), + status: Arc::new(std::sync::RwLock::new(PreFetchStatus::Empty)), kv, key, value: TArcMx::new(Default::default()), @@ -167,7 +167,7 @@ impl KVItemInner { ) -> Result<()> { // Wait result self.wg.wait().await; - if self.status.load(Ordering::Acquire) == Prefetched { + if *self.status.read().unwrap() == Prefetched { if self.err.is_err() { return self.err.clone(); } @@ -199,7 +199,7 @@ impl KVItemInner { let value = value.to_vec(); let value_wl = self.value.clone(); Box::pin(async move { - status_wl.store(Prefetched, Ordering::Release); + *status_wl.write().unwrap() = Prefetched; if value.is_empty() { return Ok(()); } @@ -515,7 +515,7 @@ impl IteratorExt { fn new_item(&self) -> KVItem { let inner_item = KVItemInner { - status: Arc::new(Atomic::new(PreFetchStatus::Empty)), + status: Arc::new(std::sync::RwLock::new(PreFetchStatus::Empty)), kv: self.kv.clone(), key: vec![], value: TArcMx::new(Default::default()), diff --git a/src/kv.rs b/src/kv.rs index 086300e..173519b 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -386,7 +386,7 @@ impl KVCore { count += 1; sz += self.opt.estimate_size(&entry) as u64; req.entries.push(EntryType::from(entry)); - req.ptrs.push(Arc::new(Atomic::new(None))); + req.ptrs.push(Arc::new(std::sync::RwLock::new(None))); req_index.push(i); } @@ -496,7 +496,7 @@ impl KVCore { // Will include deletion/tombstone case. debug!("Lsm ok, the value not at vlog file"); } else { - let ptr = req.ptrs.get(i).unwrap().load(Ordering::Relaxed); + let ptr = req.ptrs.get(i).unwrap().read().unwrap(); let ptr = ptr.unwrap(); let mut wt = Cursor::new(vec![0u8; ValuePointer::value_pointer_encoded_size()]); ptr.enc(&mut wt).unwrap(); @@ -571,12 +571,12 @@ impl KVCore { Ok(()) } - async fn update_offset(&self, ptrs: &mut Vec>>>) { + async fn update_offset(&self, ptrs: &mut Vec>>>) { // #[cfg(test)] // warn!("Ready to update offset"); let mut ptr = ValuePointer::default(); for tmp_ptr in ptrs.iter().rev() { - let tmp_ptr = tmp_ptr.load(Ordering::Acquire); + let tmp_ptr = tmp_ptr.read().unwrap(); if tmp_ptr.is_none() || tmp_ptr.unwrap().is_zero() { continue; } diff --git a/src/lib.rs b/src/lib.rs index 9ec82c3..f6874c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,7 @@ #![feature(binary_heap_into_iter_sorted)] #![feature(test)] #![feature(atomic_from_ptr, pointer_is_aligned)] +#![feature(unboxed_closures)] /// Badger DB is an embedded keyvalue database. diff --git a/src/transition.rs b/src/transition.rs index 5721c2f..1f412b5 100644 --- a/src/transition.rs +++ b/src/transition.rs @@ -155,7 +155,7 @@ impl TxN { *wl.value.lock().await = e.value.clone(); wl.user_meta = e.user_meta; wl.key = e.key.clone(); - wl.status.store(PreFetchStatus::Prefetched, std::sync::atomic::Ordering::SeqCst); + *wl.status.write().unwrap() = PreFetchStatus::Prefetched; } // We probably don't need to set KV on item here. return Ok(item); diff --git a/src/value_log.rs b/src/value_log.rs index 8b27287..9da324c 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -28,6 +28,7 @@ use std::pin::Pin; use std::sync::atomic::{AtomicI32, AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; +use std::num::NonZeroU32; use std::time::{Duration, SystemTime}; use std::{fmt, fs, io, ptr}; @@ -373,7 +374,7 @@ pub struct Request { // Input values pub(crate) entries: Vec, // Output Values and wait group stuff below - pub(crate) ptrs: Vec>>>, + pub(crate) ptrs: Vec>>>, } impl Default for Request { @@ -417,14 +418,6 @@ impl Request { .map(|ty| ty.fut_ch.clone()) .collect::>() } - - fn get_ptrs(&self) -> Vec>>> { - self.ptrs.clone() - } - - fn get_ptr(&self, i: usize) -> Option<&Arc>>> { - self.ptrs.get(i) - } } pub struct ValueLogCore { @@ -759,7 +752,7 @@ impl ValueLogCore { if !self.opt.sync_writes && entry.entry().value.len() < self.opt.value_threshold { // No need to write to value log. // WARN: if mt not flush into disk but process abort, that will discard data(the data not write into vlog that WAL file) - req.ptrs[idx] = Arc::new(Atomic::new(None)); + req.ptrs[idx] = Arc::new(std::sync::RwLock::new(None)); continue; } @@ -786,7 +779,7 @@ impl ValueLogCore { buf.get_ref().len() + self.writable_log_offset.load(Ordering::Acquire) as usize, ptr.offset as usize + sz ); - req.ptrs[idx].store(Some(ptr), Ordering::Release); + req.ptrs[idx].write().unwrap().replace(ptr); } } { From 688335ffe54ec9223545c10b8d4821b97f35b739 Mon Sep 17 00:00:00 2001 From: Rg Date: Tue, 31 Oct 2023 17:25:21 +0800 Subject: [PATCH 06/18] pass test --- src/skl/skip.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/skl/skip.rs b/src/skl/skip.rs index e0e3a3b..3f71af9 100644 --- a/src/skl/skip.rs +++ b/src/skl/skip.rs @@ -3,13 +3,13 @@ use crate::table::iterator::IteratorItem; use crate::y::{same_key_ignore_version, ValueStruct}; use crate::{Allocate, Xiterator}; +use drop_cell::defer; use log::{info, warn}; use rand::random; use std::fmt::{Debug, Display, Formatter}; use std::sync::atomic::{AtomicPtr, AtomicU32, Ordering}; use std::sync::Arc; use std::{cmp, ptr, sync::atomic::AtomicI32}; -use drop_cell::defer; use uuid::Uuid; use super::{arena::Arena, node::Node}; @@ -376,9 +376,9 @@ impl SkipList { if let Some(node) = node { let (offset, size) = node.get_value_offset(); // diff key has same prefix - if !same_key_ignore_version(key, self.arena.get_key(node.key_offset, node.key_size)) { - return None; - } + // if !same_key_ignore_version(key, self.arena.get_key(node.key_offset, node.key_size)) { + // return None; + // } let value = self.arena.get_val(offset, size); #[cfg(test)] From 1d74be8ea0c55aca21509f4fb84c6c41e6058320 Mon Sep 17 00:00:00 2001 From: Rg Date: Wed, 1 Nov 2023 01:18:30 +0800 Subject: [PATCH 07/18] :cat: --- src/transition.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/transition.rs b/src/transition.rs index 1f412b5..ad4c856 100644 --- a/src/transition.rs +++ b/src/transition.rs @@ -92,12 +92,13 @@ impl GlobalTxNStateInner { #[derive(Clone)] pub struct TxN { + // the read operation ts, it also was the version of key. read_ts: u64, // update is used to conditionally keep track of reads. update: bool, // contains fingerprints of keys read. reads: Arc>>, - // contains fingerprints of keys written. + // contains fingerprints(hash64(key)) of keys written. writes: Arc>>, // cache stores any writes done by txn. pending_writes: Arc, Entry>>>, @@ -107,6 +108,7 @@ pub struct TxN { } impl TxN { + // set a key, value with user meta at the tx pub fn set(&self, key: Vec, value: Vec, user_meta: u8) { let fp = farmhash::fingerprint64(&key); // Avoid dealing with byte arrays self.writes @@ -123,6 +125,8 @@ impl TxN { .unwrap(); } + // delete a key with at the tx + pub fn delete(&self, key: &[u8]) { let fp = farmhash::fingerprint64(&key); self.writes @@ -146,6 +150,7 @@ impl TxN { ValueStruct::default(), kv_db.clone(), )); + // if the transaction is writeable, Prioritize reading local writes if self.update { let pending_writes = self.pending_writes.write().unwrap(); if let Some(e) = pending_writes.get(key) && &e.key == key { @@ -161,13 +166,14 @@ impl TxN { return Ok(item); } + // let fp = farmhash::fingerprint64(key); self.reads .write() .map(|mut writes| writes.push(fp)) .unwrap(); } - + // Seek from DB let seek = crate::y::key_with_ts(key, self.read_ts); kv_db.get_with_ext(&seek).await } From 11ee79d0a159e45bc924f1bd55ec493a1be93c5c Mon Sep 17 00:00:00 2001 From: Rg Date: Thu, 16 Nov 2023 20:05:09 +0800 Subject: [PATCH 08/18] :dog: --- src/iterator.rs | 29 ++++++++++--------- src/kv.rs | 15 ++++++---- src/lib.rs | 5 ++-- src/transition.rs | 72 +++++++++++++++++++++++++++++++++++++++++++---- src/value_log.rs | 4 ++- 5 files changed, 98 insertions(+), 27 deletions(-) diff --git a/src/iterator.rs b/src/iterator.rs index cbc9e57..f4b62a6 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -14,6 +14,7 @@ use std::future::Future; use std::pin::Pin; +use crate::transition::TxN; use std::sync::atomic::Ordering; use std::sync::Arc; use std::{io::Cursor, sync::atomic::AtomicU64}; @@ -38,13 +39,6 @@ impl From for KVItem { } } } -// impl Deref for KVItem { -// type Target = tokio::sync::RwLock; -// -// fn deref(&self) -> &Self::Target { -// self.inner.as_ref() -// } -// } impl KVItem { pub async fn key(&self) -> Vec { @@ -62,6 +56,8 @@ impl KVItem { inner.has_value() } + /// Returns the CAS counter associated with the value. + /// TODO: Make this version. pub async fn counter(&self) -> u64 { let inner = self.rl().await; inner.counter() @@ -93,6 +89,7 @@ pub(crate) struct KVItemInner { pub(crate) value: TArcMx>, pub(crate) meta: u8, pub(crate) user_meta: u8, + // TODO: rename to version ts. cas_counter: Arc, wg: Closer, err: Result<()>, @@ -138,7 +135,7 @@ impl KVItemInner { // Returns the key. Remember to copy if you need to access it outside the iteration loop. pub(crate) fn key(&self) -> &[u8] { - &self.key + crate::y::parse_key(self.key.as_ref()) } // Return value @@ -287,7 +284,10 @@ pub(crate) const DEF_ITERATOR_OPTIONS: IteratorOptions = IteratorOptions { /// | | | /// IteratorExt reference pub struct IteratorExt { - kv: DB, + txn: TxN, + read_ts: u64, + + // kv: DB, itr: MergeIterator, opt: IteratorOptions, item: ArcRW>, @@ -330,14 +330,15 @@ pub struct IteratorExt { impl IteratorExt { pub(crate) fn new(kv: DB, itr: MergeIterator, opt: IteratorOptions) -> IteratorExt { - IteratorExt { + /*IteratorExt { kv, opt, itr, data: ArcRW::default(), item: Arc::new(Default::default()), has_rewind: ArcRW::default(), - } + }*/ + todo!() } // pub(crate) async fn new_async_iterator( @@ -444,7 +445,8 @@ impl IteratorExt { // Close the iterator, It is important to call this when you're done with iteration. pub async fn close(&self) -> Result<()> { // TODO: We could handle this error. - self.kv.vlog.as_ref().unwrap().decr_iterator_count().await?; + let db = self.txn.get_kv(); + db.vlog.as_ref().unwrap().decr_iterator_count().await?; Ok(()) } @@ -514,9 +516,10 @@ impl IteratorExt { } fn new_item(&self) -> KVItem { + let kv = self.txn.get_kv(); let inner_item = KVItemInner { status: Arc::new(std::sync::RwLock::new(PreFetchStatus::Empty)), - kv: self.kv.clone(), + kv, key: vec![], value: TArcMx::new(Default::default()), vptr: vec![], diff --git a/src/kv.rs b/src/kv.rs index 173519b..57d17dd 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -46,10 +46,12 @@ use tokio::fs::create_dir_all; use tokio::io::AsyncWriteExt; use tokio::sync::{RwLock, RwLockWriteGuard}; -/// -pub const _BADGER_PREFIX: &[u8; 8] = b"!badger!"; /// Prefix for internal keys used by badger. -pub const _HEAD: &[u8; 12] = b"!badger!head"; // For Storing value offset for replay. +pub const _BADGER_PREFIX: &[u8; 8] = b"!badger!"; +// For storing value offset for replay. +pub const _HEAD: &[u8; 12] = b"!badger!head"; +/// For the indicating end of entries in txn. +pub const TXN_KEY: &[u8; 11] = b"!badger!txn"; pub const KV_WRITE_CH_CAPACITY: usize = 1000; @@ -596,7 +598,10 @@ impl KVCore { } // Returns the current `mem_tables` and get references(here will incr mem table reference). - fn get_mem_tables<'a>(&'a self, p: &'a crossbeam_epoch::Guard) -> Vec> { + pub(crate) fn get_mem_tables<'a>( + &'a self, + p: &'a crossbeam_epoch::Guard, + ) -> Vec> { self.mem_st_manger.lock_exclusive(); defer! {self.mem_st_manger.unlock_exclusive()} @@ -652,7 +657,7 @@ impl KVCore { unsafe { &*st } } - fn must_vlog(&self) -> Arc { + pub(crate) fn must_vlog(&self) -> Arc { let vlog = self.vlog.clone().unwrap(); vlog } diff --git a/src/lib.rs b/src/lib.rs index f6874c0..ef880fb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,7 +24,6 @@ #![feature(atomic_from_ptr, pointer_is_aligned)] #![feature(unboxed_closures)] - /// Badger DB is an embedded keyvalue database. /// /// Badger DB is a library written in Rust that implements a badger-go [https://github.com/dgraph-io/badger] @@ -49,6 +48,7 @@ mod y; mod compaction; // #[cfg(test)] // mod kv_test; +mod backup; #[cfg(test)] mod kv_test; mod levels; @@ -56,7 +56,6 @@ mod pb; mod st_manager; #[cfg(test)] mod test_util; -mod backup; mod transition; pub use iterator::*; @@ -78,4 +77,4 @@ pub(crate) fn must_align(ptr: *const T) { pub(crate) fn cals_size_with_align(sz: usize, align_sz: usize) -> usize { let size = (sz + align_sz) & !align_sz; size -} \ No newline at end of file +} diff --git a/src/transition.rs b/src/transition.rs index ad4c856..3336d63 100644 --- a/src/transition.rs +++ b/src/transition.rs @@ -1,7 +1,11 @@ use crate::iterator::{KVItemInner, PreFetchStatus}; +use crate::table::iterator::IteratorItem; use crate::value_log::{Entry, MetaBit}; use crate::y::compare_key; -use crate::{Error, KVItem, Result, ValueStruct, WeakKV, DB}; +use crate::{ + Error, IteratorExt, IteratorOptions, KVItem, MergeIterOverBuilder, Result, UniIterator, + ValueStruct, WeakKV, Xiterator, DB, TXN_KEY, +}; use drop_cell::defer; use parking_lot::RwLock; use std::collections::{BinaryHeap, HashMap, HashSet}; @@ -34,7 +38,9 @@ impl GlobalTxNStateInner { } for ro in tx_reads.iter() { - if let Some(ts) = self.commits.get(ro) && *ts > tx.read_ts { + if let Some(ts) = self.commits.get(ro) + && *ts > tx.read_ts + { return true; } } @@ -126,7 +132,6 @@ impl TxN { } // delete a key with at the tx - pub fn delete(&self, key: &[u8]) { let fp = farmhash::fingerprint64(&key); self.writes @@ -153,7 +158,10 @@ impl TxN { // if the transaction is writeable, Prioritize reading local writes if self.update { let pending_writes = self.pending_writes.write().unwrap(); - if let Some(e) = pending_writes.get(key) && &e.key == key { + if let Some(e) = pending_writes.get(key) + && &e.key == key + { + // Fulfill from cache. { let mut wl = item.wl().await; wl.meta = e.meta; @@ -166,7 +174,7 @@ impl TxN { return Ok(item); } - // + // Notice: if not found at local writes, need to read operation because they key seek from DB below. let fp = farmhash::fingerprint64(key); self.reads .write() @@ -186,8 +194,62 @@ impl TxN { if commit_ts == 0 { return Err(Error::TxCommitConflict); } + + let mut entries = vec![]; + let mut pending_writes = self.pending_writes.write().unwrap(); + for e in pending_writes.values_mut().into_iter() { + // Suffix the keys with commit ts, so the key versions are sorted in descending order of commit timestamp. + // descending order of commit timestamp. + let ts_key = crate::y::key_with_ts(&e.key, commit_ts); + e.key.clear(); + e.key.extend_from_slice(&ts_key); + entries.push(e.clone()); + } + // TODO: Add logic in replay to deal with this. + let entry = Entry::default() + .key(TXN_KEY.to_vec()) + .value(commit_ts.to_string().as_bytes().to_vec()) + .meta(MetaBit::BIT_FIN_TXN.bits()); + entries.push(entry); + let db = self.get_kv(); + let res = db.batch_set(entries).await; + + for ret in res { + ret?; + } + Ok(()) } + + fn new_iterator(&self, opt: IteratorOptions) -> IteratorExt { + let db = self.get_kv(); + // Notice, the iterator is global iterator, so must incr reference for memtable(SikpList), sst(file), vlog(file). + let p = crossbeam_epoch::pin(); + let tables = db.get_mem_tables(&p); + // TODO it should decr at IteratorExt close. + defer! { + tables.iter().for_each(|table| unsafe {table.as_ref().unwrap().decr_ref()}); + } + // add vlog reference. + db.must_vlog().incr_iterator_count(); + + // Create iterators across all the tables involved first. + let mut itrs: Vec>> = vec![]; + for tb in tables.clone() { + let st = unsafe { tb.as_ref().unwrap().clone() }; + let iter = Box::new(UniIterator::new(st, opt.reverse)); + itrs.push(iter); + } + // Extend sst.table + itrs.extend(db.must_lc().as_iterator(opt.reverse)); + let mitr = MergeIterOverBuilder::default().add_batch(itrs).build(); + IteratorExt::new(db.clone(), mitr, opt) + } + + pub(crate) fn get_kv(&self) -> DB { + let kv_db = self.kv.upgrade().unwrap(); + DB::new(kv_db) + } } #[test] diff --git a/src/value_log.rs b/src/value_log.rs index 9da324c..f5e8d39 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -54,7 +54,9 @@ bitflags! { const BIT_UNUSED = 4; /// Set if the key is set using SetIfAbsent. const BIT_SET_IF_ABSENT = 8; - } + /// Set if the entry is to indicate end of txn in value log. + const BIT_FIN_TXN = 16; + } } const M: u64 = 1 << 20; From 1125efc2117725db5b53ef677d019b01e257e5d6 Mon Sep 17 00:00:00 2001 From: Rg Date: Fri, 17 Nov 2023 01:17:10 +0800 Subject: [PATCH 09/18] hello, :pig: --- src/iterator.rs | 12 ++++++------ src/kv.rs | 16 ++++++++++++++++ src/skl/skip.rs | 6 +++--- src/transition.rs | 42 +++++++++++++++++++++++++++++++++--------- 4 files changed, 58 insertions(+), 18 deletions(-) diff --git a/src/iterator.rs b/src/iterator.rs index f4b62a6..0399390 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -14,7 +14,7 @@ use std::future::Future; use std::pin::Pin; -use crate::transition::TxN; +use crate::transition::{BoxTxN, TxN}; use std::sync::atomic::Ordering; use std::sync::Arc; use std::{io::Cursor, sync::atomic::AtomicU64}; @@ -284,14 +284,14 @@ pub(crate) const DEF_ITERATOR_OPTIONS: IteratorOptions = IteratorOptions { /// | | | /// IteratorExt reference pub struct IteratorExt { - txn: TxN, + txn: BoxTxN, read_ts: u64, // kv: DB, itr: MergeIterator, opt: IteratorOptions, item: ArcRW>, - // Cache the prefetch keys, not inlcude current value + // Cache the prefetch keys, not include current value data: ArcRW>, has_rewind: ArcRW, } @@ -329,7 +329,7 @@ pub struct IteratorExt { // } impl IteratorExt { - pub(crate) fn new(kv: DB, itr: MergeIterator, opt: IteratorOptions) -> IteratorExt { + pub(crate) fn new(tv: *const TxN, itr: MergeIterator, opt: IteratorOptions) -> IteratorExt { /*IteratorExt { kv, opt, @@ -445,7 +445,7 @@ impl IteratorExt { // Close the iterator, It is important to call this when you're done with iteration. pub async fn close(&self) -> Result<()> { // TODO: We could handle this error. - let db = self.txn.get_kv(); + let db = self.txn.tx.get_kv(); db.vlog.as_ref().unwrap().decr_iterator_count().await?; Ok(()) } @@ -516,7 +516,7 @@ impl IteratorExt { } fn new_item(&self) -> KVItem { - let kv = self.txn.get_kv(); + let kv = self.txn.tx.get_kv(); let inner_item = KVItemInner { status: Arc::new(std::sync::RwLock::new(PreFetchStatus::Empty)), kv, diff --git a/src/kv.rs b/src/kv.rs index 57d17dd..f81bffe 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -42,9 +42,11 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime}; use std::{fmt, string, vec}; +use structopt::clap::AppSettings::WaitOnError; use tokio::fs::create_dir_all; use tokio::io::AsyncWriteExt; use tokio::sync::{RwLock, RwLockWriteGuard}; +use crate::transition::{GlobalTxNState, TxN}; /// Prefix for internal keys used by badger. pub const _BADGER_PREFIX: &[u8; 8] = b"!badger!"; @@ -714,6 +716,7 @@ pub type WeakKV = XWeak; #[derive(Clone)] pub struct DB { inner: XArc, + gs: GlobalTxNState, } impl Deref for DB { @@ -1199,6 +1202,19 @@ impl DB { let mitr = MergeIterOverBuilder::default().add_batch(itrs).build(); IteratorExt::new(self.clone(), mitr, opt) } + + pub fn new_transaction(&self, update: bool) -> Result { + Ok(TxN{ + read_ts: 0, + update, + reads: Arc::new(Default::default()), + writes: Arc::new(Default::default()), + pending_writes: Arc::new(Default::default()), + kv: self.clone(), + gs: self.gs.clone(), + }) + } + /// Closes a KV. It's crucial to call it to ensure all the pending updates /// make their way to disk. pub async fn close(&self) -> Result<()> { diff --git a/src/skl/skip.rs b/src/skl/skip.rs index 3f71af9..000393d 100644 --- a/src/skl/skip.rs +++ b/src/skl/skip.rs @@ -376,9 +376,9 @@ impl SkipList { if let Some(node) = node { let (offset, size) = node.get_value_offset(); // diff key has same prefix - // if !same_key_ignore_version(key, self.arena.get_key(node.key_offset, node.key_size)) { - // return None; - // } + if !same_key_ignore_version(key, self.arena.get_key(node.key_offset, node.key_size)) { + return None; + } let value = self.arena.get_val(offset, size); #[cfg(test)] diff --git a/src/transition.rs b/src/transition.rs index 3336d63..b6be8e1 100644 --- a/src/transition.rs +++ b/src/transition.rs @@ -1,5 +1,6 @@ use crate::iterator::{KVItemInner, PreFetchStatus}; use crate::table::iterator::IteratorItem; +use crate::types::TArcRW; use crate::value_log::{Entry, MetaBit}; use crate::y::compare_key; use crate::{ @@ -12,7 +13,13 @@ use std::collections::{BinaryHeap, HashMap, HashSet}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +#[derive(Clone)] +pub struct GlobalTxNState { + inner: TArcRW, +} + struct GlobalTxNStateInner { + // TODO may be not lock, because outline has locked a time lock: Arc>, cur_read: AtomicU64, next_commit: AtomicU64, @@ -99,18 +106,18 @@ impl GlobalTxNStateInner { #[derive(Clone)] pub struct TxN { // the read operation ts, it also was the version of key. - read_ts: u64, + pub(crate) read_ts: u64, // update is used to conditionally keep track of reads. - update: bool, + pub(crate) update: bool, // contains fingerprints of keys read. - reads: Arc>>, + pub(crate) reads: Arc>>, // contains fingerprints(hash64(key)) of keys written. - writes: Arc>>, + pub(crate) writes: Arc>>, // cache stores any writes done by txn. - pending_writes: Arc, Entry>>>, + pub(crate) pending_writes: Arc, Entry>>>, - kv: WeakKV, - gs: Arc>, + pub(crate) kv: DB, + pub(crate) gs: GlobalTxNState, } impl TxN { @@ -190,7 +197,9 @@ impl TxN { if self.writes.write().unwrap().is_empty() { return Ok(()); // Ready only translation. } - let commit_ts = self.gs.write().unwrap().new_commit_ts(&self); + // TODO FIXME lock + let mut gs = self.gs.inner.write().await; + let commit_ts = gs.new_commit_ts(&self); if commit_ts == 0 { return Err(Error::TxCommitConflict); } @@ -243,7 +252,8 @@ impl TxN { // Extend sst.table itrs.extend(db.must_lc().as_iterator(opt.reverse)); let mitr = MergeIterOverBuilder::default().add_batch(itrs).build(); - IteratorExt::new(db.clone(), mitr, opt) + let txn = self as *const TxN; + IteratorExt::new(txn, mitr, opt) } pub(crate) fn get_kv(&self) -> DB { @@ -252,5 +262,19 @@ impl TxN { } } +pub(crate) struct BoxTxN { + pub tx: *const TxN, +} + +unsafe impl Send for BoxTxN {} + +unsafe impl Sync for BoxTxN {} + +impl BoxTxN { + pub(crate) fn new(tx: *const TxN) -> BoxTxN { + BoxTxN { tx } + } +} + #[test] fn it() {} From e0ed242a59be694cf36f241599ed01c453bbc027 Mon Sep 17 00:00:00 2001 From: Rg Date: Fri, 17 Nov 2023 22:20:28 +0800 Subject: [PATCH 10/18] :dog: --- src/iterator.rs | 53 +++++++++++++++++++++++++++++++++++++++++------ src/kv.rs | 22 +++++++++----------- src/transition.rs | 27 +++++++++++++++++------- 3 files changed, 76 insertions(+), 26 deletions(-) diff --git a/src/iterator.rs b/src/iterator.rs index 0399390..3393142 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -294,6 +294,7 @@ pub struct IteratorExt { // Cache the prefetch keys, not include current value data: ArcRW>, has_rewind: ArcRW, + last_key: ArcRW>, } /// TODO FIXME @@ -377,11 +378,11 @@ impl IteratorExt { } // rewind the iterator // rewind, next, rewind?, thie item is who! - let mut item = self.itr.rewind(); + let mut _item = self.itr.rewind(); // filter internal data - while item.is_some() && item.as_ref().unwrap().key().starts_with(_BADGER_PREFIX) { - item = self.itr.next(); - } + //while item.is_some() && item.as_ref().unwrap().key().starts_with(_BADGER_PREFIX) { + // item = self.itr.next(); + //} // Before every rewind, the item will be reset to None self.item.write().take(); // prefetch item. @@ -445,7 +446,7 @@ impl IteratorExt { // Close the iterator, It is important to call this when you're done with iteration. pub async fn close(&self) -> Result<()> { // TODO: We could handle this error. - let db = self.txn.tx.get_kv(); + let db = unsafe { self.txn.tx.as_ref().unwrap().get_kv() }; db.vlog.as_ref().unwrap().decr_iterator_count().await?; Ok(()) } @@ -516,7 +517,7 @@ impl IteratorExt { } fn new_item(&self) -> KVItem { - let kv = self.txn.tx.get_kv(); + let kv = self.txn().get_kv(); let inner_item = KVItemInner { status: Arc::new(std::sync::RwLock::new(PreFetchStatus::Empty)), kv, @@ -532,8 +533,48 @@ impl IteratorExt { return KVItem::from(inner_item); } + // The is a complex function because it needs to handle both forward and reverse iteration + // implementation. We store keys such that their version are sorted in descending order. This makes + // forward iteration efficient, but revese iteration complecated. This tradeoff is better because + // forward iteration is more common than reverse. + // + // This function advances the iterator. + fn parse_item(&self) -> bool { + let itr = self.itr; + let item = itr.peek().unwrap(); + // Skip bager keys. + if item.key().starts_with(_BADGER_PREFIX) { + itr.next(); + return false; + } + // Skip any version which are beyond the read_ts + let ver = crate::y::parse_ts(item.key()); + if ver > self.read_ts { + itr.next(); + return false; + } + // If iteration in forward direction. then just checking the last key against current key would + // be sufficient. + if !self.opt.reverse { + if crate::y::same_key_ignore_version(self.last_key.write().as_ref(), item.key()) { + itr.next(); + return false; + } + // Only track in froward direction. + // We should update last_key as soon as we find a different key in our snapshot. + // Consider keys: a 5, b 7 (del), b 5. When iterating, last_key = a. + // Then we see b 7, which is deleted. If we don't store last_key = b, we'll then return b 5, + // + } + false + } + // Returns false when iteration is done. fn valid(&self) -> bool { self.item.read().is_some() } + + fn txn(&self) -> &TxN { + unsafe { self.txn.tx.as_ref().unwrap() } + } } diff --git a/src/kv.rs b/src/kv.rs index f81bffe..c59d0f0 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -34,6 +34,7 @@ use std::path::Path; use std::pin::Pin; use crate::pb::backup::KVPair; +use crate::transition::{GlobalTxNState, TxN}; use libc::difftime; use rand::random; use std::fmt::Formatter; @@ -46,7 +47,6 @@ use structopt::clap::AppSettings::WaitOnError; use tokio::fs::create_dir_all; use tokio::io::AsyncWriteExt; use tokio::sync::{RwLock, RwLockWriteGuard}; -use crate::transition::{GlobalTxNState, TxN}; /// Prefix for internal keys used by badger. pub const _BADGER_PREFIX: &[u8; 8] = b"!badger!"; @@ -79,12 +79,6 @@ impl FlushTask { } } -/// A builder for KV building. -pub struct KVBuilder { - opt: Options, - kv: BoxKV, -} - /// Manage key/value #[doc(hidden)] #[derive(Clone)] @@ -111,6 +105,7 @@ pub struct KVCore { // we use an atomic op. pub(crate) last_used_cas_counter: Arc, share_lock: TArcRW<()>, + pub(crate) txn_state: GlobalTxNState, } impl Drop for KVCore { @@ -716,7 +711,6 @@ pub type WeakKV = XWeak; #[derive(Clone)] pub struct DB { inner: XArc, - gs: GlobalTxNState, } impl Deref for DB { @@ -791,6 +785,7 @@ impl DB { last_used_cas_counter: Arc::new(AtomicU64::new(1)), mem_st_manger: Arc::new(SkipListManager::new(opt.arena_size() as usize)), share_lock: TArcRW::new(tokio::sync::RwLock::new(())), + txn_state: GlobalTxNState::default(), }; let manifest = out.manifest.clone(); @@ -1200,18 +1195,18 @@ impl DB { // Extend sst.table itrs.extend(self.must_lc().as_iterator(opt.reverse)); let mitr = MergeIterOverBuilder::default().add_batch(itrs).build(); - IteratorExt::new(self.clone(), mitr, opt) + + IteratorExt::new(std::ptr::null(), mitr, opt) } pub fn new_transaction(&self, update: bool) -> Result { - Ok(TxN{ + Ok(TxN { read_ts: 0, update, reads: Arc::new(Default::default()), writes: Arc::new(Default::default()), pending_writes: Arc::new(Default::default()), kv: self.clone(), - gs: self.gs.clone(), }) } @@ -1360,7 +1355,10 @@ impl DB { } pub(crate) fn new(inner: XArc) -> DB { - DB { inner } + DB { + inner, + gs: GlobalTxNState::default(), + } } pub(crate) fn to_ref(&self) -> &KVCore { diff --git a/src/transition.rs b/src/transition.rs index b6be8e1..5d227d8 100644 --- a/src/transition.rs +++ b/src/transition.rs @@ -5,7 +5,7 @@ use crate::value_log::{Entry, MetaBit}; use crate::y::compare_key; use crate::{ Error, IteratorExt, IteratorOptions, KVItem, MergeIterOverBuilder, Result, UniIterator, - ValueStruct, WeakKV, Xiterator, DB, TXN_KEY, + ValueStruct, Xiterator, DB, TXN_KEY, }; use drop_cell::defer; use parking_lot::RwLock; @@ -13,7 +13,7 @@ use std::collections::{BinaryHeap, HashMap, HashSet}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -#[derive(Clone)] +#[derive(Clone, Default)] pub struct GlobalTxNState { inner: TArcRW, } @@ -32,6 +32,19 @@ struct GlobalTxNStateInner { commits: HashMap, } +impl Default for GlobalTxNStateInner { + fn default() -> Self { + GlobalTxNStateInner { + lock: Default::default(), + cur_read: Default::default(), + next_commit: AtomicU64::new(1), + commit_mark: Default::default(), + pending_commits: Default::default(), + commits: Default::default(), + } + } +} + impl GlobalTxNStateInner { fn read_ts(&self) -> u64 { self.cur_read.load(Ordering::SeqCst) @@ -117,7 +130,6 @@ pub struct TxN { pub(crate) pending_writes: Arc, Entry>>>, pub(crate) kv: DB, - pub(crate) gs: GlobalTxNState, } impl TxN { @@ -155,8 +167,7 @@ impl TxN { } pub async fn get(&self, key: &[u8]) -> Result { - let kv_db = self.kv.upgrade().unwrap(); - let kv_db = DB::new(kv_db); + let kv_db = self.get_kv(); let item = KVItem::from(KVItemInner::new( vec![], ValueStruct::default(), @@ -198,7 +209,8 @@ impl TxN { return Ok(()); // Ready only translation. } // TODO FIXME lock - let mut gs = self.gs.inner.write().await; + let gs = self.get_kv().txn_state.clone(); + let mut gs = gs.inner.write().await; let commit_ts = gs.new_commit_ts(&self); if commit_ts == 0 { return Err(Error::TxCommitConflict); @@ -257,8 +269,7 @@ impl TxN { } pub(crate) fn get_kv(&self) -> DB { - let kv_db = self.kv.upgrade().unwrap(); - DB::new(kv_db) + self.kv.clone() } } From 6f4be2e584661dc214f4b10f79581badb39e4c74 Mon Sep 17 00:00:00 2001 From: Rg Date: Wed, 22 Nov 2023 19:02:15 +0800 Subject: [PATCH 11/18] :dog: --- src/iterator.rs | 54 +++++++++++++++++++++++++++++++++++++++++++------ src/levels.rs | 2 +- src/y/mod.rs | 24 +++++++++++----------- 3 files changed, 61 insertions(+), 19 deletions(-) diff --git a/src/iterator.rs b/src/iterator.rs index 3393142..f28523e 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -539,15 +539,16 @@ impl IteratorExt { // forward iteration is more common than reverse. // // This function advances the iterator. - fn parse_item(&self) -> bool { - let itr = self.itr; + // TODO ... + async fn parse_item(&self) -> bool { + let itr = &self.itr; let item = itr.peek().unwrap(); // Skip bager keys. if item.key().starts_with(_BADGER_PREFIX) { itr.next(); return false; } - // Skip any version which are beyond the read_ts + // Skip any version which are *beyond* the read_ts let ver = crate::y::parse_ts(item.key()); if ver > self.read_ts { itr.next(); @@ -560,13 +561,54 @@ impl IteratorExt { itr.next(); return false; } - // Only track in froward direction. + // Only track in forward direction. // We should update last_key as soon as we find a different key in our snapshot. // Consider keys: a 5, b 7 (del), b 5. When iterating, last_key = a. // Then we see b 7, which is deleted. If we don't store last_key = b, we'll then return b 5, - // + // which is wrong. Therefore, update lastKey here. + let mut last_key = self.last_key.write(); + last_key.clear(); + last_key.extend_from_slice(itr.peek().unwrap().key()); + } + + loop { + // If deleted, advance and return. + if itr.peek().unwrap().value().meta & MetaBit::BIT_DELETE.bits() > 0 { + itr.next(); + return false; + } + + let item = self.new_item(); + self.fill(item.clone()).await; + // fill item based on current cursor position. All Next calls have returned, so reaching here + // means no Next was called. + itr.next(); + if !self.opt.reverse || itr.peek().is_none() { + let mut itr_item = self.item.write(); + if itr_item.is_none() { + *itr_item = Some(item); + } + return true; + } + + // Reverse direction + let next_ts = crate::y::parse_ts(itr.peek().as_ref().unwrap().key()); + if next_ts <= self.read_ts + && crate::y::same_key_ignore_version( + itr.peek().unwrap().key(), + item.key().await.as_ref(), + ) + { + // This is a valid potential candidate. + continue; + } + // Ignore the next candidate. Return the current one. + let mut itr_item = self.item.write(); + if itr_item.is_none() { + *itr_item = Some(item); + } + return true; } - false } // Returns false when iteration is done. diff --git a/src/levels.rs b/src/levels.rs index 44eb7be..ef6c92e 100644 --- a/src/levels.rs +++ b/src/levels.rs @@ -21,6 +21,7 @@ use log::{debug, error, info, warn}; use parking_lot::lock_api::RawRwLock; use tracing::instrument; +use crate::pb::badgerpb3::manifest_change::Operation::{CREATE, DELETE}; use itertools::Itertools; use rand::random; use std::collections::HashSet; @@ -35,7 +36,6 @@ use std::vec; use tokio::macros::support::thread_rng_n; use tokio::sync::{RwLock, RwLockWriteGuard}; use tokio::time::sleep; -use crate::pb::badgerpb3::manifest_change::Operation::{CREATE, DELETE}; #[derive(Clone)] pub(crate) struct LevelsController { diff --git a/src/y/mod.rs b/src/y/mod.rs index 3a26ed0..3061ab2 100644 --- a/src/y/mod.rs +++ b/src/y/mod.rs @@ -17,9 +17,9 @@ use std::fs::{File, OpenOptions}; use std::hash::Hasher; use std::io::{Cursor, ErrorKind, Write}; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::backtrace::Backtrace; use std::{array, cmp, io}; -use byteorder::{WriteBytesExt, BigEndian, ReadBytesExt}; use thiserror::Error; use tracing::info; @@ -273,7 +273,7 @@ pub(crate) fn parallel_load_block_key(fp: File, offsets: Vec) -> Vec String { } // Generates a new key by appending ts to key. +#[inline(always)] pub(crate) fn key_with_ts(key: &[u8], ts: u64) -> Vec { let mut out = vec![0u8; key.len() + 8]; out.copy_from_slice(key); @@ -390,27 +391,26 @@ pub(crate) fn parse_key(out: &[u8]) -> &[u8] { } // checks for key equality ignoring the version timestamp suffix. -pub(crate) fn same_key_ignore_version(src: &[u8], dst: &[u8]) -> bool { - if src.len() != dst.len() { +#[inline(always)] +pub(crate) fn same_key_ignore_version>(src: T, dst: T) -> bool { + if src.as_ref().len() != dst.as_ref().len() { return false; } - let key_src = parse_key(src); - let key_dst = parse_key(dst); + let key_src = parse_key(src.as_ref()); + let key_dst = parse_key(dst.as_ref()); key_src == key_dst } - // compare_keys checks the key without timestamp and checks the timestamp if keyNoTs // is same. // a would be sorted higher than aa if we use bytes.compare // All keys should have timestamp. +#[inline(always)] pub(crate) fn compare_key(key1: &[u8], key2: &[u8]) -> Ordering { assert!(key1.len() > 8); assert!(key2.len() > 8); return match parse_key(&key1).cmp(&parse_key(key2)) { - Ordering::Equal => { - parse_ts(key1).cmp(&parse_ts(key2)) - } + Ordering::Equal => parse_ts(key1).cmp(&parse_ts(key2)), other => other, }; } @@ -431,8 +431,8 @@ fn dsync() { /// find a value in array with binary search pub fn binary_search(array: &[T], f: F) -> Option - where - F: Fn(&T) -> Ordering, +where + F: Fn(&T) -> Ordering, { let mut low = 0; let mut high = array.len() - 1; From 7a8465a654ff3dc4f3ef58780dba65bcecb01701 Mon Sep 17 00:00:00 2001 From: Rg Date: Thu, 23 Nov 2023 18:45:36 +0800 Subject: [PATCH 12/18] :dog: --- src/iterator.rs | 51 +++++++++++++++++-------------------------------- src/skl/skip.rs | 10 ++++++---- src/y/mod.rs | 5 +++-- 3 files changed, 26 insertions(+), 40 deletions(-) diff --git a/src/iterator.rs b/src/iterator.rs index f28523e..27f3751 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -290,7 +290,7 @@ pub struct IteratorExt { // kv: DB, itr: MergeIterator, opt: IteratorOptions, - item: ArcRW>, + //item: ArcRW>, // Cache the prefetch keys, not include current value data: ArcRW>, has_rewind: ArcRW, @@ -365,7 +365,7 @@ impl IteratorExt { break; } self.pre_fetch().await; - self.item.read().clone() + self.peek().await } // Rewind the iterator cursor all the wy to zero-th position, which would be the @@ -379,51 +379,26 @@ impl IteratorExt { // rewind the iterator // rewind, next, rewind?, thie item is who! let mut _item = self.itr.rewind(); - // filter internal data - //while item.is_some() && item.as_ref().unwrap().key().starts_with(_BADGER_PREFIX) { - // item = self.itr.next(); - //} - // Before every rewind, the item will be reset to None - self.item.write().take(); // prefetch item. self.pre_fetch().await; // return the first el. - self.item.read().clone() + self.peek().await } // Advance the iterator by one (*NOTICE*: must be rewind when you call self.next()) pub async fn next(&self) -> Option { - // Ensure current item has load - if let Some(el) = self.item.write().take() { - el.rl().await.wg.wait().await; // Just cleaner to wait before pushing to avoid doing ref counting. - } - // Set next item to current - if let Some(el) = self.data.write().pop_front() { - self.item.write().replace(el); - } // Advance internal iterator until entry is not deleted - while let Some(el) = self.itr.next() { - if el.key().starts_with(_BADGER_PREFIX) { - continue; - } - if el.value().meta & MetaBit::BIT_DELETE.bits() == 0 { + while self.itr.next().is_some() { + if self.parse_item().await { // Not deleted break; } } - let item = self.itr.peek(); - if item.is_none() { - return None; - } - - let xitem = self.new_item(); - self.fill(xitem.clone()).await; - self.data.write().push_back(xitem.clone()); - Some(xitem) + self.peek().await } pub async fn peek(&self) -> Option { - self.item.read().clone() + self.data.read().front().map(|item| item.clone()) } } @@ -451,7 +426,7 @@ impl IteratorExt { Ok(()) } - // fill the value + // fill the value of merge iterator into item async fn fill(&self, item: KVItem) { let vs = self.itr.peek().unwrap(); let vs = vs.value(); @@ -557,6 +532,8 @@ impl IteratorExt { // If iteration in forward direction. then just checking the last key against current key would // be sufficient. if !self.opt.reverse { + // The key has acceed, so don't access it again. + // TODO FIXME, I don't think so, a 5, b 7 (del), b5, b 6 if crate::y::same_key_ignore_version(self.last_key.write().as_ref(), item.key()) { itr.next(); return false; @@ -579,14 +556,18 @@ impl IteratorExt { } let item = self.new_item(); + // Load key, en,en,en, maybe lazy load self.fill(item.clone()).await; // fill item based on current cursor position. All Next calls have returned, so reaching here // means no Next was called. - itr.next(); + itr.next(); // Advance but no fill item yet. if !self.opt.reverse || itr.peek().is_none() { + // forward direction, or invalid. let mut itr_item = self.item.write(); if itr_item.is_none() { *itr_item = Some(item); + } else { + self.data.write().push_back(item); } return true; } @@ -606,6 +587,8 @@ impl IteratorExt { let mut itr_item = self.item.write(); if itr_item.is_none() { *itr_item = Some(item); + } else { + self.data.write().push_back(item); } return true; } diff --git a/src/skl/skip.rs b/src/skl/skip.rs index 000393d..e7d9c01 100644 --- a/src/skl/skip.rs +++ b/src/skl/skip.rs @@ -462,15 +462,17 @@ impl Display for SkipList { k: String, v: String, } - let mut kv = vec![]; + let mut kv = vec![KV { + k: "st-id".to_owned(), + v: self.id().to_string(), + }]; for _kv in self.key_values() { kv.push(KV { - k: String::from_utf8(_kv.0.to_vec()).unwrap(), - v: String::from_utf8(_kv.1.value).unwrap(), + k: crate::hex_str(_kv.0), + v: crate::hex_str(_kv.1.value), }); } let table = Table::new(kv).to_string(); - writeln!(f, "SkipList=>").unwrap(); writeln!(f, "{}", table) } } diff --git a/src/y/mod.rs b/src/y/mod.rs index 3061ab2..a0c1a4f 100644 --- a/src/y/mod.rs +++ b/src/y/mod.rs @@ -360,8 +360,9 @@ pub(crate) async fn async_sync_directory(d: String) -> Result<()> { Ok(()) } -pub(crate) fn hex_str(buf: &[u8]) -> String { - String::from_utf8(buf.to_vec()).unwrap_or_else(|_| "Sorry, Hex String Failed!!!".to_string()) +pub(crate) fn hex_str>(buf: T) -> String { + String::from_utf8(buf.as_ref().to_vec()) + .unwrap_or_else(|_| "Sorry, Hex String Failed!!!".to_string()) } // Generates a new key by appending ts to key. From 5e08a995f7e3c24d6accdbb8e3d05e1ff6d1abbd Mon Sep 17 00:00:00 2001 From: Rg Date: Fri, 24 Nov 2023 01:13:01 +0800 Subject: [PATCH 13/18] :cat: --- src/iterator.rs | 92 ++++++++++++------------------------------------ src/kv.rs | 1 - src/value_log.rs | 4 +-- 3 files changed, 24 insertions(+), 73 deletions(-) diff --git a/src/iterator.rs b/src/iterator.rs index 27f3751..788ddf5 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -149,7 +149,7 @@ impl KVItemInner { Ok(()) }) }) - .await?; + .await?; Ok(ch.recv().await.unwrap()) } @@ -160,7 +160,7 @@ impl KVItemInner { // Note that the call to the consumer func happens synchronously. pub(crate) async fn value( &self, - mut consumer: impl FnMut(&[u8]) -> Pin> + Send>>, + mut consumer: impl FnMut(&[u8]) -> Pin> + Send>>, ) -> Result<()> { // Wait result self.wg.wait().await; @@ -205,7 +205,7 @@ impl KVItemInner { Ok(()) }) }) - .await + .await } // Returns approximate size of the key-value pair. @@ -264,6 +264,7 @@ impl Default for IteratorOptions { impl IteratorOptions { pub fn new(pre_fetch_values: bool, pre_fetch_size: isize, reverse: bool) -> Self { + assert!(pre_fetch_size > 0); IteratorOptions { pre_fetch_values, pre_fetch_size, @@ -331,15 +332,15 @@ pub struct IteratorExt { impl IteratorExt { pub(crate) fn new(tv: *const TxN, itr: MergeIterator, opt: IteratorOptions) -> IteratorExt { - /*IteratorExt { - kv, + IteratorExt { + txn: BoxTxN::new(tv), opt, itr, data: ArcRW::default(), - item: Arc::new(Default::default()), has_rewind: ArcRW::default(), - }*/ - todo!() + read_ts: 0, + last_key: Arc::new(Default::default()), + } } // pub(crate) async fn new_async_iterator( @@ -358,12 +359,7 @@ impl IteratorExt { while let Some(el) = self.data.write().pop_front() { el.rl().await.wg.wait().await; } - while let Some(el) = self.itr.seek(key) { - if el.key().starts_with(_BADGER_PREFIX) { - continue; - } - break; - } + self.itr.seek(key); self.pre_fetch().await; self.peek().await } @@ -377,8 +373,8 @@ impl IteratorExt { el.rl().await.wg.wait().await; } // rewind the iterator - // rewind, next, rewind?, thie item is who! - let mut _item = self.itr.rewind(); + // rewind, next, rewind?, the item is who! + self.itr.rewind(); // prefetch item. self.pre_fetch().await; // return the first el. @@ -386,6 +382,7 @@ impl IteratorExt { } // Advance the iterator by one (*NOTICE*: must be rewind when you call self.next()) + // to ensure you have access to a valid it.item() pub async fn next(&self) -> Option { // Advance internal iterator until entry is not deleted while self.itr.next().is_some() { @@ -403,21 +400,6 @@ impl IteratorExt { } impl IteratorExt { - // Returns false when iteration is done - // or when the current key is not prefixed by the specified prefix. - async fn valid_for_prefix(&self, prefix: &[u8]) -> bool { - self.item.read().is_some() - && self - .item - .read() - .as_ref() - .unwrap() - .rl() - .await - .key() - .starts_with(prefix) - } - // Close the iterator, It is important to call this when you're done with iteration. pub async fn close(&self) -> Result<()> { // TODO: We could handle this error. @@ -466,28 +448,13 @@ impl IteratorExt { let itr = &self.itr; let mut count = 0; while let Some(item) = itr.peek() { - if item.key().starts_with(crate::kv::_BADGER_PREFIX) { - itr.next(); - continue; - } - if item.value().meta & MetaBit::BIT_DELETE.bits() > 0 { - itr.next(); + if !self.parse_item().await { continue; } count += 1; - let xitem = self.new_item(); - // fill a el from itr.peek - self.fill(xitem.clone()).await; - if self.item.read().is_none() { - self.item.write().replace(xitem); // store it - } else { - // push prefetch el into cache queue, Notice it not including current item - self.data.write().push_back(xitem); - } if count == pre_fetch_size { break; } - itr.next(); } } @@ -510,7 +477,7 @@ impl IteratorExt { // The is a complex function because it needs to handle both forward and reverse iteration // implementation. We store keys such that their version are sorted in descending order. This makes - // forward iteration efficient, but revese iteration complecated. This tradeoff is better because + // forward iteration efficient, but reverse iteration complecated. This tradeoff is better because // forward iteration is more common than reverse. // // This function advances the iterator. @@ -518,7 +485,7 @@ impl IteratorExt { async fn parse_item(&self) -> bool { let itr = &self.itr; let item = itr.peek().unwrap(); - // Skip bager keys. + // Skip badger keys. if item.key().starts_with(_BADGER_PREFIX) { itr.next(); return false; @@ -532,7 +499,7 @@ impl IteratorExt { // If iteration in forward direction. then just checking the last key against current key would // be sufficient. if !self.opt.reverse { - // The key has acceed, so don't access it again. + // The key has accessed, so don't access it again. // TODO FIXME, I don't think so, a 5, b 7 (del), b5, b 6 if crate::y::same_key_ignore_version(self.last_key.write().as_ref(), item.key()) { itr.next(); @@ -563,12 +530,7 @@ impl IteratorExt { itr.next(); // Advance but no fill item yet. if !self.opt.reverse || itr.peek().is_none() { // forward direction, or invalid. - let mut itr_item = self.item.write(); - if itr_item.is_none() { - *itr_item = Some(item); - } else { - self.data.write().push_back(item); - } + self.data.write().push_back(item); return true; } @@ -576,29 +538,19 @@ impl IteratorExt { let next_ts = crate::y::parse_ts(itr.peek().as_ref().unwrap().key()); if next_ts <= self.read_ts && crate::y::same_key_ignore_version( - itr.peek().unwrap().key(), - item.key().await.as_ref(), - ) + itr.peek().unwrap().key(), + item.key().await.as_ref(), + ) { // This is a valid potential candidate. continue; } // Ignore the next candidate. Return the current one. - let mut itr_item = self.item.write(); - if itr_item.is_none() { - *itr_item = Some(item); - } else { - self.data.write().push_back(item); - } + self.data.write().push_back(item); return true; } } - // Returns false when iteration is done. - fn valid(&self) -> bool { - self.item.read().is_some() - } - fn txn(&self) -> &TxN { unsafe { self.txn.tx.as_ref().unwrap() } } diff --git a/src/kv.rs b/src/kv.rs index c59d0f0..0f5026c 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -1357,7 +1357,6 @@ impl DB { pub(crate) fn new(inner: XArc) -> DB { DB { inner, - gs: GlobalTxNState::default(), } } diff --git a/src/value_log.rs b/src/value_log.rs index f5e8d39..fadb115 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -761,8 +761,8 @@ impl ValueLogCore { #[cfg(test)] debug!( "Write # {:?} => {} into vlog file, offset: {}, meta:{}", - hex_str(entry.entry().key.as_ref()), - hex_str(entry.entry().value.as_ref()), + hex_str(&entry.entry().key), + hex_str(&entry.entry().value), self.buf.read().await.get_ref().len() + self.writable_log_offset.load(Ordering::Acquire) as usize, entry.entry.meta, From cf0018edeb7df99f7dd317ba6b0b3b371d74737f Mon Sep 17 00:00:00 2001 From: Rg Date: Thu, 30 Nov 2023 20:49:34 +0800 Subject: [PATCH 14/18] Hii --- src/backup.rs | 9 ++++++--- src/iterator.rs | 13 ++++++------- src/skl/skip.rs | 17 ++++++----------- src/y/codec.rs | 2 +- src/y/mod.rs | 2 +- 5 files changed, 20 insertions(+), 23 deletions(-) diff --git a/src/backup.rs b/src/backup.rs index 1bc0829..7fa4493 100644 --- a/src/backup.rs +++ b/src/backup.rs @@ -1,9 +1,12 @@ -use std::io::Write; +use crate::pb::backup::KVPair; use byteorder::{LittleEndian, WriteBytesExt}; use protobuf::Message; -use crate::pb::backup::KVPair; +use std::io::Write; -pub fn write_to(entry: &KVPair, wt: &mut W) -> crate::Result<()> where W: Write { +pub fn write_to(entry: &KVPair, wt: &mut W) -> crate::Result<()> +where + W: Write, +{ let buf = entry.write_to_bytes().unwrap(); wt.write_u64::(buf.len() as u64)?; wt.write_all(&buf)?; diff --git a/src/iterator.rs b/src/iterator.rs index 788ddf5..6ff2e5d 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -84,7 +84,6 @@ pub(crate) struct KVItemInner { pub(crate) status: Arc>, kv: DB, pub(crate) key: Vec, - // TODO, Opz memory pub(crate) vptr: Vec, pub(crate) value: TArcMx>, pub(crate) meta: u8, @@ -149,7 +148,7 @@ impl KVItemInner { Ok(()) }) }) - .await?; + .await?; Ok(ch.recv().await.unwrap()) } @@ -160,7 +159,7 @@ impl KVItemInner { // Note that the call to the consumer func happens synchronously. pub(crate) async fn value( &self, - mut consumer: impl FnMut(&[u8]) -> Pin> + Send>>, + mut consumer: impl FnMut(&[u8]) -> Pin> + Send>>, ) -> Result<()> { // Wait result self.wg.wait().await; @@ -205,7 +204,7 @@ impl KVItemInner { Ok(()) }) }) - .await + .await } // Returns approximate size of the key-value pair. @@ -538,9 +537,9 @@ impl IteratorExt { let next_ts = crate::y::parse_ts(itr.peek().as_ref().unwrap().key()); if next_ts <= self.read_ts && crate::y::same_key_ignore_version( - itr.peek().unwrap().key(), - item.key().await.as_ref(), - ) + itr.peek().unwrap().key(), + item.key().await.as_ref(), + ) { // This is a valid potential candidate. continue; diff --git a/src/skl/skip.rs b/src/skl/skip.rs index e7d9c01..fa2d63f 100644 --- a/src/skl/skip.rs +++ b/src/skl/skip.rs @@ -74,15 +74,10 @@ impl SkipList { } // Sub crease the reference count, deallocating the skiplist when done using it - // TODO pub fn decr_ref(&self) { self._ref.fetch_sub(1, Ordering::Release); } - // fn valid(&self) -> bool { - // self.arena_ref().valid() - // } - pub(crate) fn arena_ref(&self) -> &Arena { &self.arena } @@ -226,8 +221,8 @@ impl SkipList { /// Inserts the key-value pair. /// FIXME: it bad, should be not use unsafe, but .... - pub fn put(&self, key: &[u8], v: ValueStruct) { - self._put(key, v) + pub fn put>(&self, key: T, v: ValueStruct) { + self._put(key.as_ref(), v) } fn _put(&self, key: &[u8], v: ValueStruct) { @@ -883,7 +878,7 @@ mod tests { #[test] fn t_one_key() { - let key = "thekey"; + let key = b"thekey"; let st = Arc::new(SkipList::new(ARENA_SIZE)); let mut waits = (0..100) .map(|i| { @@ -891,8 +886,8 @@ mod tests { let key = key.clone(); spawn(move || { st.put( - key.as_bytes(), - ValueStruct::new(format!("{}", i).as_bytes().to_vec(), 0, 0, i as u64), + &key, + ValueStruct::new(i.to_string().as_bytes().to_vec(), 0, 0, i as u64), ); }) }) @@ -904,7 +899,7 @@ mod tests { let key = key.clone(); let save_value = save_value.clone(); let join = spawn(move || { - if let Some(value) = st.get(key.as_bytes()) { + if let Some(value) = st.get(&key) { save_value.fetch_add(1, Ordering::Relaxed); let v = String::from_utf8_lossy(&value.value) .parse::() diff --git a/src/y/codec.rs b/src/y/codec.rs index 88f492e..78cfaa8 100644 --- a/src/y/codec.rs +++ b/src/y/codec.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use std::io::{Read, Write}; +use tokio::io::{AsyncRead, AsyncWrite}; use crate::Result; @@ -10,7 +11,6 @@ pub trait Encode { pub trait Decode { fn dec(&mut self, rd: &mut dyn Read) -> Result<()>; } -use tokio::io::{AsyncRead, AsyncWrite}; #[async_trait] pub trait AsyncEncDec diff --git a/src/y/mod.rs b/src/y/mod.rs index a0c1a4f..e9711d3 100644 --- a/src/y/mod.rs +++ b/src/y/mod.rs @@ -385,7 +385,7 @@ pub(crate) fn parse_ts(out: &[u8]) -> u64 { #[inline(always)] pub(crate) fn parse_key(out: &[u8]) -> &[u8] { - if out.is_empty() { + if out.len() < 8 { return out; } &out[..(out.len() - 8)] From 8ae3188c052be4a3c88d7d10ff86f7e6bb2fa241 Mon Sep 17 00:00:00 2001 From: Rg Date: Thu, 7 Dec 2023 20:46:38 +0800 Subject: [PATCH 15/18] :cat: --- Cargo.toml | 1 + src/iterator.rs | 16 ++++++++++------ src/kv_test.rs | 9 ++++++++- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4a24389..db96862 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ protobuf = { version = "3.0.0-alpha.2", features = ["with-bytes"] } clap = { version = "4.4.6", features = ["derive"] } structopt = "0.3.26" farmhash = "1.1.5" +arc-cell = "0.3.3" [dev-dependencies] tracing-subscriber = "0.3.17" tracing-log = "0.2.0" diff --git a/src/iterator.rs b/src/iterator.rs index 6ff2e5d..1d43aa6 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -7,8 +7,10 @@ use crate::{ Decode, MergeIterator, Result, Xiterator, EMPTY_SLICE, }; +use arc_cell::ArcCell; use atomic::Atomic; +use core::slice::SlicePattern; use std::fmt::{Debug, Display, Formatter}; use std::future::Future; @@ -290,11 +292,10 @@ pub struct IteratorExt { // kv: DB, itr: MergeIterator, opt: IteratorOptions, - //item: ArcRW>, // Cache the prefetch keys, not include current value data: ArcRW>, has_rewind: ArcRW, - last_key: ArcRW>, + last_key: Arc>>, } /// TODO FIXME @@ -500,7 +501,7 @@ impl IteratorExt { if !self.opt.reverse { // The key has accessed, so don't access it again. // TODO FIXME, I don't think so, a 5, b 7 (del), b5, b 6 - if crate::y::same_key_ignore_version(self.last_key.write().as_ref(), item.key()) { + if crate::y::same_key_ignore_version(self.get_last_key().as_slice(), item.key()) { itr.next(); return false; } @@ -509,9 +510,8 @@ impl IteratorExt { // Consider keys: a 5, b 7 (del), b 5. When iterating, last_key = a. // Then we see b 7, which is deleted. If we don't store last_key = b, we'll then return b 5, // which is wrong. Therefore, update lastKey here. - let mut last_key = self.last_key.write(); - last_key.clear(); - last_key.extend_from_slice(itr.peek().unwrap().key()); + self.last_key + .set(Arc::new(itr.peek().unwrap().key().to_vec())); } loop { @@ -553,4 +553,8 @@ impl IteratorExt { fn txn(&self) -> &TxN { unsafe { self.txn.tx.as_ref().unwrap() } } + + fn get_last_key(&self) -> Arc> { + self.last_key.get() + } } diff --git a/src/kv_test.rs b/src/kv_test.rs index f4778e9..74bb3c1 100644 --- a/src/kv_test.rs +++ b/src/kv_test.rs @@ -40,6 +40,13 @@ async fn t_1_write() { let got = kv.get(b"hello").await; assert!(got.is_ok()); assert_eq!(&got.unwrap(), b"word"); + + // Write again + let res = kv.set(b"hello".to_vec(), b"word1".to_vec(), 20).await; + assert!(res.is_ok()); + let got = kv.get(b"hello").await; + assert!(got.is_ok()); + assert_eq!(&got.unwrap(), b"word1"); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -495,7 +502,7 @@ async fn kv_iterator_basic() { tracing_log(); let kv = build_kv().await; - let n = 10000; + let n = 1; let bkey = |i: usize| format!("{:09}", i).as_bytes().to_vec(); let bvalue = |i: usize| format!("{:025}", i).as_bytes().to_vec(); From 5209fcc64829db9de0c7984692d4e901824cf61e Mon Sep 17 00:00:00 2001 From: Rg Date: Wed, 20 Dec 2023 19:43:10 +0800 Subject: [PATCH 16/18] :cat: --- src/iterator.rs | 7 ++++ src/kv.rs | 4 +-- src/kv_test.rs | 5 +-- src/lib.rs | 2 ++ src/test_util.rs | 11 ++++++ src/transition.rs | 78 +++++++++++++++++++++-------------------- src/y/merge_iterator.rs | 4 ++- src/y/mod.rs | 10 +++--- 8 files changed, 72 insertions(+), 49 deletions(-) diff --git a/src/iterator.rs b/src/iterator.rs index 1d43aa6..46e387c 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use std::{io::Cursor, sync::atomic::AtomicU64}; use tokio::io::AsyncWriteExt; use tokio::sync::{RwLockReadGuard, RwLockWriteGuard}; +use tracing::info; #[derive(Debug, PartialEq, Copy, Clone)] pub(crate) enum PreFetchStatus { @@ -456,6 +457,7 @@ impl IteratorExt { break; } } + info!("Has fetch count: {}", count); } fn new_item(&self) -> KVItem { @@ -492,6 +494,11 @@ impl IteratorExt { } // Skip any version which are *beyond* the read_ts let ver = crate::y::parse_ts(item.key()); + info!( + "The version is {}, read_ts: {}", + hex_str(item.key()), + self.read_ts + ); if ver > self.read_ts { itr.next(); return false; diff --git a/src/kv.rs b/src/kv.rs index 0f5026c..b3bb088 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -1355,9 +1355,7 @@ impl DB { } pub(crate) fn new(inner: XArc) -> DB { - DB { - inner, - } + DB { inner } } pub(crate) fn to_ref(&self) -> &KVCore { diff --git a/src/kv_test.rs b/src/kv_test.rs index 74bb3c1..9c4ca22 100644 --- a/src/kv_test.rs +++ b/src/kv_test.rs @@ -526,8 +526,9 @@ async fn kv_iterator_basic() { let itr = kv.new_iterator(opt).await; let mut count = 0; let mut rewind = true; - info!("Startinh first basic iteration"); - itr.rewind().await; + info!("Starting first basic iteration"); + let first = itr.rewind().await; + info!("first {:?}", first); while let Some(item) = itr.peek().await { let rd_item = item.rl().await; let key = rd_item.key(); diff --git a/src/lib.rs b/src/lib.rs index ef880fb..81055dd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,6 +57,8 @@ mod st_manager; #[cfg(test)] mod test_util; mod transition; +#[cfg(test)] +mod transition_test; pub use iterator::*; pub use kv::*; diff --git a/src/test_util.rs b/src/test_util.rs index 9a03b4b..cabb1b8 100644 --- a/src/test_util.rs +++ b/src/test_util.rs @@ -14,6 +14,7 @@ use tokio_metrics::TaskMonitor; use tracing_subscriber::fmt::format::Writer; use tracing_subscriber::fmt::time::FormatTime; use tracing_subscriber::EnvFilter; +use crate::Options; #[cfg(test)] pub fn push_log_by_filename(fpath: &str, buf: &[u8]) { @@ -217,3 +218,13 @@ fn tk2() { use itertools::Merge; } + + +pub(crate) fn get_test_option(dir: &str) -> Options { + let mut opt = Options::default(); + opt.max_table_size = 1 << 15; // Force more compaction. + opt.level_one_size = 4 << 15; // Force more compaction. + opt.dir = Box::new(dir.to_string()); + opt.value_dir = Box::new(dir.to_string()); + opt +} diff --git a/src/transition.rs b/src/transition.rs index 5d227d8..86b2974 100644 --- a/src/transition.rs +++ b/src/transition.rs @@ -56,7 +56,7 @@ impl GlobalTxNStateInner { if tx_reads.is_empty() { return false; } - + // check the all tx keys version must be more than `commits` for ro in tx_reads.iter() { if let Some(ts) = self.commits.get(ro) && *ts > tx.read_ts @@ -211,6 +211,7 @@ impl TxN { // TODO FIXME lock let gs = self.get_kv().txn_state.clone(); let mut gs = gs.inner.write().await; + // Get a new commit ts for current tx let commit_ts = gs.new_commit_ts(&self); if commit_ts == 0 { return Err(Error::TxCommitConflict); @@ -232,6 +233,10 @@ impl TxN { .value(commit_ts.to_string().as_bytes().to_vec()) .meta(MetaBit::BIT_FIN_TXN.bits()); entries.push(entry); + + // TODO if has some key failed to execute .....? + defer! {gs.done_commit(commit_ts)}; + let db = self.get_kv(); let res = db.batch_set(entries).await; @@ -242,50 +247,47 @@ impl TxN { Ok(()) } - fn new_iterator(&self, opt: IteratorOptions) -> IteratorExt { - let db = self.get_kv(); - // Notice, the iterator is global iterator, so must incr reference for memtable(SikpList), sst(file), vlog(file). - let p = crossbeam_epoch::pin(); - let tables = db.get_mem_tables(&p); - // TODO it should decr at IteratorExt close. - defer! { + fn new_iterator(&self, opt: IteratorOptions) -> IteratorExt { + let db = self.get_kv(); + // Notice, the iterator is global iterator, so must incr reference for memtable(SikpList), sst(file), vlog(file). + let p = crossbeam_epoch::pin(); + let tables = db.get_mem_tables(&p); + // TODO it should decr at IteratorExt close. + defer! { tables.iter().for_each(|table| unsafe {table.as_ref().unwrap().decr_ref()}); } - // add vlog reference. - db.must_vlog().incr_iterator_count(); - - // Create iterators across all the tables involved first. - let mut itrs: Vec>> = vec![]; - for tb in tables.clone() { - let st = unsafe { tb.as_ref().unwrap().clone() }; - let iter = Box::new(UniIterator::new(st, opt.reverse)); - itrs.push(iter); + // add vlog reference. + db.must_vlog().incr_iterator_count(); + + // Create iterators across all the tables involved first. + let mut itrs: Vec>> = vec![]; + for tb in tables.clone() { + let st = unsafe { tb.as_ref().unwrap().clone() }; + let iter = Box::new(UniIterator::new(st, opt.reverse)); + itrs.push(iter); + } + // Extend sst.table + itrs.extend(db.must_lc().as_iterator(opt.reverse)); + let mitr = MergeIterOverBuilder::default().add_batch(itrs).build(); + let txn = self as *const TxN; + IteratorExt::new(txn, mitr, opt) } - // Extend sst.table - itrs.extend(db.must_lc().as_iterator(opt.reverse)); - let mitr = MergeIterOverBuilder::default().add_batch(itrs).build(); - let txn = self as *const TxN; - IteratorExt::new(txn, mitr, opt) - } - pub(crate) fn get_kv(&self) -> DB { - self.kv.clone() + pub(crate) fn get_kv(&self) -> DB { + self.kv.clone() + } } -} -pub(crate) struct BoxTxN { - pub tx: *const TxN, -} + pub(crate) struct BoxTxN { + pub tx: *const TxN, + } -unsafe impl Send for BoxTxN {} + unsafe impl Send for BoxTxN {} -unsafe impl Sync for BoxTxN {} + unsafe impl Sync for BoxTxN {} -impl BoxTxN { - pub(crate) fn new(tx: *const TxN) -> BoxTxN { - BoxTxN { tx } + impl BoxTxN { + pub(crate) fn new(tx: *const TxN) -> BoxTxN { + BoxTxN { tx } + } } -} - -#[test] -fn it() {} diff --git a/src/y/merge_iterator.rs b/src/y/merge_iterator.rs index 5d2b451..cc98a13 100644 --- a/src/y/merge_iterator.rs +++ b/src/y/merge_iterator.rs @@ -51,6 +51,7 @@ impl Xiterator for MergeIterator { self.set_iter_empty(); return None; } + // Must be call rewind before call next assert_ne!(self.cursor.borrow().index, usize::MAX); self._next() } @@ -62,7 +63,7 @@ impl Xiterator for MergeIterator { return None; } { - // Before every rewind, all flags will be resetted + // Before every rewind, all flags will be reset self.reset(); for (index, itr) in self.itrs.iter().enumerate() { if itr.rewind().is_some() { @@ -177,6 +178,7 @@ impl MergeIterator { { let heap = self.heap.borrow_mut(); if heap.is_empty() { + // TODO why self.set_iter_empty(); return None; } diff --git a/src/y/mod.rs b/src/y/mod.rs index e9711d3..4394c12 100644 --- a/src/y/mod.rs +++ b/src/y/mod.rs @@ -273,7 +273,7 @@ pub(crate) fn parallel_load_block_key(fp: File, offsets: Vec) -> Vec>(buf: T) -> String { #[inline(always)] pub(crate) fn key_with_ts(key: &[u8], ts: u64) -> Vec { let mut out = vec![0u8; key.len() + 8]; - out.copy_from_slice(key); - out.write_u64::(u64::MAX - ts).unwrap(); + out[..key.len()].copy_from_slice(key); + (&mut out[key.len()..]).write_u64::(u64::MAX - ts).unwrap(); out } @@ -432,8 +432,8 @@ fn dsync() { /// find a value in array with binary search pub fn binary_search(array: &[T], f: F) -> Option -where - F: Fn(&T) -> Ordering, + where + F: Fn(&T) -> Ordering, { let mut low = 0; let mut high = array.len() - 1; From 22b53a1713f6b5636ee620b71109410fd980cc61 Mon Sep 17 00:00:00 2001 From: Rg Date: Fri, 29 Dec 2023 19:51:36 +0800 Subject: [PATCH 17/18] update --- src/transition.rs | 83 ++++++++++++++++++++++++++++------------------- 1 file changed, 49 insertions(+), 34 deletions(-) diff --git a/src/transition.rs b/src/transition.rs index 86b2974..9d4e9f8 100644 --- a/src/transition.rs +++ b/src/transition.rs @@ -10,14 +10,29 @@ use crate::{ use drop_cell::defer; use parking_lot::RwLock; use std::collections::{BinaryHeap, HashMap, HashSet}; +use std::ops::Deref; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use tokio::sync::{RwLockReadGuard, RwLockWriteGuard}; #[derive(Clone, Default)] pub struct GlobalTxNState { inner: TArcRW, } +impl GlobalTxNState { + pub async fn rl(&self) -> RwLockReadGuard<'_, GlobalTxNStateInner> { + self.inner.read().await + } + pub async fn wl(&self) -> RwLockWriteGuard<'_, GlobalTxNStateInner> { + self.inner.write().await + } + + pub async fn read_ts(&self) -> u64 { + self.rl().await.read_ts() + } +} + struct GlobalTxNStateInner { // TODO may be not lock, because outline has locked a time lock: Arc>, @@ -247,47 +262,47 @@ impl TxN { Ok(()) } - fn new_iterator(&self, opt: IteratorOptions) -> IteratorExt { - let db = self.get_kv(); - // Notice, the iterator is global iterator, so must incr reference for memtable(SikpList), sst(file), vlog(file). - let p = crossbeam_epoch::pin(); - let tables = db.get_mem_tables(&p); - // TODO it should decr at IteratorExt close. - defer! { + fn new_iterator(&self, opt: IteratorOptions) -> IteratorExt { + let db = self.get_kv(); + // Notice, the iterator is global iterator, so must incr reference for memtable(SikpList), sst(file), vlog(file). + let p = crossbeam_epoch::pin(); + let tables = db.get_mem_tables(&p); + // TODO it should decr at IteratorExt close. + defer! { tables.iter().for_each(|table| unsafe {table.as_ref().unwrap().decr_ref()}); } - // add vlog reference. - db.must_vlog().incr_iterator_count(); - - // Create iterators across all the tables involved first. - let mut itrs: Vec>> = vec![]; - for tb in tables.clone() { - let st = unsafe { tb.as_ref().unwrap().clone() }; - let iter = Box::new(UniIterator::new(st, opt.reverse)); - itrs.push(iter); - } - // Extend sst.table - itrs.extend(db.must_lc().as_iterator(opt.reverse)); - let mitr = MergeIterOverBuilder::default().add_batch(itrs).build(); - let txn = self as *const TxN; - IteratorExt::new(txn, mitr, opt) - } - - pub(crate) fn get_kv(&self) -> DB { - self.kv.clone() + // add vlog reference. + db.must_vlog().incr_iterator_count(); + + // Create iterators across all the tables involved first. + let mut itrs: Vec>> = vec![]; + for tb in tables.clone() { + let st = unsafe { tb.as_ref().unwrap().clone() }; + let iter = Box::new(UniIterator::new(st, opt.reverse)); + itrs.push(iter); } + // Extend sst.table + itrs.extend(db.must_lc().as_iterator(opt.reverse)); + let mitr = MergeIterOverBuilder::default().add_batch(itrs).build(); + let txn = self as *const TxN; + IteratorExt::new(txn, mitr, opt) } - pub(crate) struct BoxTxN { - pub tx: *const TxN, + pub(crate) fn get_kv(&self) -> DB { + self.kv.clone() } +} - unsafe impl Send for BoxTxN {} +pub(crate) struct BoxTxN { + pub tx: *const TxN, +} - unsafe impl Sync for BoxTxN {} +unsafe impl Send for BoxTxN {} - impl BoxTxN { - pub(crate) fn new(tx: *const TxN) -> BoxTxN { - BoxTxN { tx } - } +unsafe impl Sync for BoxTxN {} + +impl BoxTxN { + pub(crate) fn new(tx: *const TxN) -> BoxTxN { + BoxTxN { tx } } +} From 2fb71baa2cbc7b4fe409ff2397a5c03582d09914 Mon Sep 17 00:00:00 2001 From: Rg Date: Sun, 29 Sep 2024 02:19:58 +0800 Subject: [PATCH 18/18] :-1: --- src/kv_test.rs | 2 +- src/levels.rs | 66 ++++++++++++++------------------------------ src/lib.rs | 3 -- src/table/builder.rs | 2 -- src/test_util.rs | 56 ++++--------------------------------- src/value_log.rs | 42 ++++++---------------------- 6 files changed, 36 insertions(+), 135 deletions(-) diff --git a/src/kv_test.rs b/src/kv_test.rs index 9c4ca22..78cfb67 100644 --- a/src/kv_test.rs +++ b/src/kv_test.rs @@ -134,7 +134,7 @@ async fn t_batch_write() { "cost time: {}s", SystemTime::now().duration_since(start).unwrap().as_secs() ); - kv.must_lc().print_level_fids(); + //kv.must_lc().print_level_fids(); kv.close().await.unwrap(); } diff --git a/src/levels.rs b/src/levels.rs index ef6c92e..2509693 100644 --- a/src/levels.rs +++ b/src/levels.rs @@ -7,7 +7,7 @@ use crate::pb::badgerpb3::ManifestChange; use crate::table::builder::Builder; use crate::table::iterator::{ConcatIterator, IteratorImpl, IteratorItem}; use crate::table::table::{get_id_map, new_file_name, Table, TableCore}; -use crate::types::{Channel, Closer, TArcMx, TArcRW, XArc}; +use crate::types::{Channel, Closer, TArcRW, XArc}; use crate::y::{ async_sync_directory, create_synced_file, open_existing_synced_file, sync_directory, }; @@ -18,24 +18,21 @@ use atomic::Ordering; use awaitgroup::WaitGroup; use drop_cell::defer; use log::{debug, error, info, warn}; -use parking_lot::lock_api::RawRwLock; use tracing::instrument; use crate::pb::badgerpb3::manifest_change::Operation::{CREATE, DELETE}; -use itertools::Itertools; use rand::random; use std::collections::HashSet; use std::fmt::{Debug, Display, Formatter}; use std::fs::remove_file; use std::io::Write; -use std::ops::Deref; +use std::ops::{Deref, MulAssign}; use std::sync::atomic::AtomicU64; -use std::sync::{Arc, RwLockReadGuard}; +use std::sync::Arc; use std::time::{Duration, SystemTime}; use std::vec; use tokio::macros::support::thread_rng_n; -use tokio::sync::{RwLock, RwLockWriteGuard}; -use tokio::time::sleep; +use tokio::sync::RwLock; #[derive(Clone)] pub(crate) struct LevelsController { @@ -235,7 +232,7 @@ impl LevelsController { let pick: Vec = self.pick_compact_levels(); info!("Try to compact levels, {:?}", pick); if pick.is_empty() { - zero_level_compact_chan.try_send(()); + _ = zero_level_compact_chan.try_send(()); } for p in pick { match self.do_compact(p.clone()).await { @@ -243,7 +240,7 @@ impl LevelsController { info!("Succeed to compacted"); if p.level == 0 { // zero level has compacted, memory SkipList can continue handle *write request* - zero_level_compact_chan.try_send(()); + _ = zero_level_compact_chan.try_send(()); } }, Ok(false) => { @@ -262,14 +259,14 @@ impl LevelsController { info!("Try to compact levels, {:?}", pick); if pick.is_empty() { // No table need to compact, notify `KV` continue handle *write request* - zero_level_compact_chan.try_send(()); + _ = zero_level_compact_chan.try_send(()); } for p in pick { match self.do_compact(p.clone()).await { Ok(true) => { info!("Succeed to compacted"); if p.level == 0 { - zero_level_compact_chan.try_send(()); + _ = zero_level_compact_chan.try_send(()); } }, Ok(false) => { @@ -442,7 +439,7 @@ impl LevelsController { let notify_try_compact_chan = self.notify_try_compact_chan.tx(); while !self.levels[0].try_add_level0_table(table.clone()).await { // Notify compact job - notify_try_compact_chan.try_send(()); + _ = notify_try_compact_chan.try_send(()); // Stall. Make sure all levels are healthy before we unstall. let mut start_time = SystemTime::now(); { @@ -455,7 +452,12 @@ impl LevelsController { .unwrap() .as_millis() ); - info!("{:?}, {}", self.opt, self.levels[0].num_tables()); + info!( + "{:?}, {}, cost: {:?}", + self.opt, + self.levels[0].num_tables(), + start_time.elapsed().unwrap() + ); let c_status = self.c_status.rl(); for i in 0..self.opt.max_levels { info!( @@ -537,7 +539,7 @@ impl LevelsController { let mut g = WaitGroup::new(); let execute_time = SystemTime::now(); defer! { - let cost = SystemTime::now().duration_since(execute_time).unwrap().as_millis(); + let _cost = SystemTime::now().duration_since(execute_time).unwrap().as_millis(); } { let cd = cd.read().await; @@ -558,7 +560,7 @@ impl LevelsController { } else { assert_eq!(1, top_tables.len()); } - let is_empty = bot_tables.is_empty(); + let _is_empty = bot_tables.is_empty(); for tb in top_tables { let iter = Box::new(IteratorImpl::new(tb, false)); itr.push(iter); @@ -574,39 +576,20 @@ impl LevelsController { // mitr.export_disk_ext(); // } mitr.rewind(); - let tid = random::(); - let mut count = 0; - let cur = tokio::runtime::Handle::current(); + let mut _count = 0; + let _cur = tokio::runtime::Handle::current(); loop { // #[cfg(test)] // let mut keys = vec![]; let start_time = SystemTime::now(); let mut builder = Builder::default(); while let Some(value) = mitr.peek() { - count += 1; + _count += 1; assert!(builder.add(value.key(), value.value()).is_ok()); mitr.next(); if builder.reached_capacity(self.opt.max_table_size) { break; } - - // #[cfg(test)] - // { - // // error!("merge, mitr{}, key {}", mitr.id(), hex_str(value.key())); - // { - // crate::test_util::push_log( - // format!( - // "tid:{}, mitr:{}, key:{}", - // tid, - // mitr.id(), - // hex_str(value.key()) - // ) - // .as_bytes(), - // false, - // ); - // } - // keys.push(value.key().to_vec()); - // } } if builder.is_zero_bytes() { warn!("Builder is empty"); @@ -916,15 +899,6 @@ impl LevelsController { let id = self.next_file_id.fetch_add(1, Ordering::Relaxed); id } - - pub(crate) fn print_level_fids(&self) { - let sz = self - .levels - .iter() - .map(|lv| lv.num_tables()) - .collect::>(); - warn!("every level table's size: {:?}", sz); - } } #[derive(Debug, Clone)] diff --git a/src/lib.rs b/src/lib.rs index 81055dd..96c1cea 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,6 @@ #![feature(type_alias_impl_trait)] #![feature(strict_provenance_atomic_ptr)] #![feature(atomic_from_mut)] -#![feature(cursor_remaining)] #![feature(pattern)] #![feature(cell_leak)] #![feature(path_file_prefix)] @@ -57,8 +56,6 @@ mod st_manager; #[cfg(test)] mod test_util; mod transition; -#[cfg(test)] -mod transition_test; pub use iterator::*; pub use kv::*; diff --git a/src/table/builder.rs b/src/table/builder.rs index 443b7fb..876920d 100644 --- a/src/table/builder.rs +++ b/src/table/builder.rs @@ -2,9 +2,7 @@ use crate::y::{hash, is_eof, Decode, Encode, ValueStruct}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use drop_cell::defer; use growable_bloom_filter::GrowableBloom; -use log::{debug, info}; use serde_json; -use std::hash::Hasher; use std::io::{Cursor, Read, Write}; use std::time::SystemTime; diff --git a/src/test_util.rs b/src/test_util.rs index cabb1b8..64810f5 100644 --- a/src/test_util.rs +++ b/src/test_util.rs @@ -1,20 +1,16 @@ -use atomic::Atomic; +use crate::Options; use chrono::Local; -use log::{info, kv::source::as_map, kv::Source, warn, Level}; +use log::{info, warn}; use rand::random; -use std::collections::HashMap; use std::env::temp_dir; use std::fs::create_dir_all; use std::io; -use std::sync::atomic::{AtomicI32, AtomicU64, Ordering}; +use std::sync::atomic::Ordering; use std::sync::Arc; -use std::time::Duration; -use tokio::runtime::Handle; use tokio_metrics::TaskMonitor; use tracing_subscriber::fmt::format::Writer; use tracing_subscriber::fmt::time::FormatTime; use tracing_subscriber::EnvFilter; -use crate::Options; #[cfg(test)] pub fn push_log_by_filename(fpath: &str, buf: &[u8]) { @@ -34,7 +30,7 @@ pub fn push_log(buf: &[u8], rd: bool) { // #[cfg(test)] // return; use std::io::Write; - let mut fpath = "raw_log.log"; + let fpath = "raw_log.log"; let mut fp = if !rd { std::fs::File::options() .write(true) @@ -57,13 +53,11 @@ pub fn push_log(buf: &[u8], rd: bool) { #[cfg(test)] pub fn remove_push_log() { use std::fs::remove_file; - remove_file("raw_log.log"); + remove_file("raw_log.log").unwrap(); } #[cfg(test)] pub(crate) fn tracing_log() { - use libc::remove; - use tracing::{info, Level}; use tracing_subscriber; struct LocalTimer; @@ -100,23 +94,6 @@ pub(crate) fn tracing_log() { // let recorder = metrics_prometheus::install(); } -#[cfg(test)] -pub(crate) async fn start_metrics() -> TaskMonitor { - let monitor = tokio_metrics::TaskMonitor::new(); - // print task metrics every 500ms - { - let frequency = std::time::Duration::from_millis(500); - let monitor = monitor.clone(); - tokio::spawn(async move { - for metrics in monitor.intervals() { - warn!("{:?}", metrics); - tokio::time::sleep(frequency).await; - } - }); - } - monitor -} - pub fn random_tmp_dir() -> String { let id = random::(); let path = temp_dir().join(id.to_string()).join("badger"); @@ -129,24 +106,6 @@ pub fn create_random_tmp_dir() -> String { fpath } -#[test] -fn it_work() { - #[tracing::instrument(skip_all)] - fn call() { - info!("call c"); - } - - #[tracing::instrument(skip_all)] - fn my_function(my_arg: usize) { - info!("execute my function"); - call(); - } - - tracing_log(); - my_function(1000); - info!("Hello Body"); -} - #[tokio::test] async fn runtime_tk() { use tokio::{sync::RwLock, task::JoinHandle}; @@ -198,7 +157,7 @@ fn tk2() { let a = Arc::new(std::sync::atomic::AtomicI32::new(10000000)); let ac = a.clone(); rt.block_on(async move { - for i in 0..10000 { + for _ in 0..10000 { let ac = ac.clone(); tokio::spawn(async move { ac.fetch_sub(1, Ordering::Relaxed); @@ -215,11 +174,8 @@ fn tk2() { println!("return {}", ret); }); println!("{}", a.load(Ordering::Relaxed)); - - use itertools::Merge; } - pub(crate) fn get_test_option(dir: &str) -> Options { let mut opt = Options::default(); opt.max_table_size = 1 << 15; // Force more compaction. diff --git a/src/value_log.rs b/src/value_log.rs index fadb115..aa5a2a2 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -1,34 +1,24 @@ use async_channel::Receiver; -use atomic::Atomic; - -use bitflags::{bitflags, Flags}; +use bitflags::bitflags; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; -use bytes::BufMut; +use bytes::Buf; use crc32fast::Hasher; use drop_cell::defer; use getset::{Getters, Setters}; - -use log::kv::Source; -use log::{debug, info}; +use log::info; use memmap::Mmap; - use rand::random; - use std::collections::{HashMap, HashSet}; -use std::fmt::{Debug, Display, Formatter}; +use std::fmt::{Debug, Formatter}; use std::fs::{read_dir, remove_file}; use std::future::Future; -use std::io::{BufRead, Cursor, Read, Seek, SeekFrom, Write}; - +use std::io::{Cursor, Read, Seek, SeekFrom, Write}; use std::mem::size_of; -use std::ops::{Deref, Index}; +use std::ops::Index; use std::path::Path; use std::pin::Pin; - use std::sync::atomic::{AtomicI32, AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; - -use std::num::NonZeroU32; use std::time::{Duration, SystemTime}; use std::{fmt, fs, io, ptr}; @@ -41,7 +31,7 @@ use crate::options::Options; use crate::types::{Channel, Closer, TArcRW}; use crate::y::{create_synced_file, open_existing_synced_file, sync_directory, Decode, Encode}; use crate::Error::Unexpected; -use crate::{event, hex_str, Error, Result, EMPTY_SLICE}; +use crate::{event, Error, Result, EMPTY_SLICE}; bitflags! { /// Values have their first byte being byteData or byteDelete. This helps us distinguish between @@ -758,15 +748,6 @@ impl ValueLogCore { continue; } - #[cfg(test)] - debug!( - "Write # {:?} => {} into vlog file, offset: {}, meta:{}", - hex_str(&entry.entry().key), - hex_str(&entry.entry().value), - self.buf.read().await.get_ref().len() - + self.writable_log_offset.load(Ordering::Acquire) as usize, - entry.entry.meta, - ); let mut ptr = ValuePointer::default(); ptr.fid = cur_fid; // Use the offset including buffer length so far. @@ -785,9 +766,9 @@ impl ValueLogCore { } } { - assert!(wt_count <= 0 || !self.buf.read().await.is_empty()); + assert!(wt_count <= 0 || self.buf.read().await.has_remaining()); let mut buffer = self.buf.write().await; - if buffer.is_empty() { + if !buffer.has_remaining() { return Ok(()); } // write value pointer into vlog file. (Just only write to mmap) @@ -1082,11 +1063,6 @@ impl ValueLogCore { } count += 1; - #[cfg(test)] - if count == 1 { - debug!("merge from {}", vptr.offset); - } - // TODO confiure if count % 100 == 0 { tokio::time::sleep(Duration::from_millis(1)).await;