diff --git a/Cargo.toml b/Cargo.toml index 8a626bb..db96862 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,8 +14,8 @@ tokio = { version = "1.29.1", features = ["full", "tracing"] } byteorder = "1.4.3" rand = "0.8.5" maligned = "0.2.1" -atomic = "0.5.3" -tabled = { version = "0.12.2", features = ["ansi-str", "color"] } +atomic = "0.6.0" +tabled = { version = "0.14.0", features = ["ansi-str", "color"] } memmap = "0.7.0" bytes = "1.4.0" bloom = "0.3.2" @@ -30,7 +30,7 @@ parking_lot = "0.12.1" bitflags = "2.3.3" libc = "0.2.147" log = { version = "0.4.19", features = ["kv_unstable", "kv_unstable_serde", "kv_unstable_sval"] } -async-channel = "1.9.0" +async-channel = "2.0.0" file-guard = "0.1.0" fs2 = "0.4.3" awaitgroup = "0.7.0" @@ -45,13 +45,13 @@ eieio = "1.0.0" either = "1.8.1" enum-unitary = "0.5.0" atom_box = "0.1.2" -console-subscriber = "0.1.10" +console-subscriber = "0.2.0" uuid = { version = "1.4.1", features = ["v5", "v4"] } winapi = "0.3.9" itertools = "0.11.0" -tokio-metrics = "0.2.2" +tokio-metrics = "0.3.1" metrics = "0.21.1" -metrics-prometheus = "0.4.1" +metrics-prometheus = "0.5.0" prometheus = "0.13.3" lazy_static = "1.4.0" getset = "0.1.2" @@ -60,14 +60,18 @@ async-stream = "0.3.5" futures-core = "0.3.28" backtrace-on-stack-overflow = "0.3.0" protobuf = { version = "3.0.0-alpha.2", features = ["with-bytes"] } +clap = { version = "4.4.6", features = ["derive"] } +structopt = "0.3.26" +farmhash = "1.1.5" +arc-cell = "0.3.3" [dev-dependencies] tracing-subscriber = "0.3.17" -tracing-log = "0.1.3" +tracing-log = "0.2.0" chrono = "0.4.26" env_logger = "0.10.0" console_log = { version = "1.0.0", features = ["color"] } itertools = "0.11.0" -tokio-metrics = { version = "0.2.2", default-features = false } +tokio-metrics = { version = "0.3.1", default-features = false } tokio = { version = "1.29.1", features = ["full", "rt", "time", "macros", "test-util"] } criterion = { version = "0.5.1", features = ["tokio"] } diff --git a/examples/badger.rs b/examples/badger.rs index a787c90..8453593 100644 --- a/examples/badger.rs +++ b/examples/badger.rs @@ -1,20 +1,39 @@ +use std::path::PathBuf; +use structopt::StructOpt; + + +#[derive(StructOpt)] +#[structopt(about = "BadgerDB demo")] +enum Opt { + BackUp, +} + + #[tokio::main] async fn main() { + let env = tracing_subscriber::EnvFilter::from_default_env(); tracing_subscriber::FmtSubscriber::builder() .with_env_filter(env) .try_init() .unwrap(); - let opt = badger_rs::Options::default(); - let kv = badger_rs::KV::open(opt).await.unwrap(); - kv.set( - b"hello word".to_vec(), - b">>>>>I LOVE YOU!<<<<<".to_vec(), - 0x0, - ) - .await - .unwrap(); - - let got = kv.get(b"hello word").await.unwrap(); - println!("{}", String::from_utf8_lossy(&got)); -} + + + let opt = Opt::from_args(); + match opt { + Opt::BackUp => { + let opt = badger_rs::Options::default(); + let kv = badger_rs::DB::open(opt).await.unwrap(); + kv.set( + b"hello word".to_vec(), + b">>>>>I LOVE YOU!<<<<<".to_vec(), + 0x0, + ) + .await + .unwrap(); + + let got = kv.get(b"hello word").await.unwrap(); + println!("{}", String::from_utf8_lossy(&got)); + } + } +} \ No newline at end of file diff --git a/src/backup.rs b/src/backup.rs index 1bc0829..7fa4493 100644 --- a/src/backup.rs +++ b/src/backup.rs @@ -1,9 +1,12 @@ -use std::io::Write; +use crate::pb::backup::KVPair; use byteorder::{LittleEndian, WriteBytesExt}; use protobuf::Message; -use crate::pb::backup::KVPair; +use std::io::Write; -pub fn write_to(entry: &KVPair, wt: &mut W) -> crate::Result<()> where W: Write { +pub fn write_to(entry: &KVPair, wt: &mut W) -> crate::Result<()> +where + W: Write, +{ let buf = entry.write_to_bytes().unwrap(); wt.write_u64::(buf.len() as u64)?; wt.write_all(&buf)?; diff --git a/src/iterator.rs b/src/iterator.rs index cb83cdd..46e387c 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -1,24 +1,28 @@ use crate::iterator::PreFetchStatus::Prefetched; use crate::kv::_BADGER_PREFIX; use crate::types::{ArcRW, Channel, Closer, TArcMx, TArcRW}; -use crate::{hex_str, ValueStruct, KV}; +use crate::{hex_str, ValueStruct, DB}; use crate::{ value_log::{MetaBit, ValuePointer}, Decode, MergeIterator, Result, Xiterator, EMPTY_SLICE, }; +use arc_cell::ArcCell; use atomic::Atomic; -use std::fmt::{Debug, Display, Formatter, Pointer}; +use core::slice::SlicePattern; +use std::fmt::{Debug, Display, Formatter}; use std::future::Future; -use std::pin::{pin, Pin}; +use std::pin::Pin; +use crate::transition::{BoxTxN, TxN}; use std::sync::atomic::Ordering; use std::sync::Arc; use std::{io::Cursor, sync::atomic::AtomicU64}; use tokio::io::AsyncWriteExt; use tokio::sync::{RwLockReadGuard, RwLockWriteGuard}; +use tracing::info; #[derive(Debug, PartialEq, Copy, Clone)] pub(crate) enum PreFetchStatus { @@ -38,13 +42,6 @@ impl From for KVItem { } } } -// impl Deref for KVItem { -// type Target = tokio::sync::RwLock; -// -// fn deref(&self) -> &Self::Target { -// self.inner.as_ref() -// } -// } impl KVItem { pub async fn key(&self) -> Vec { @@ -62,6 +59,8 @@ impl KVItem { inner.has_value() } + /// Returns the CAS counter associated with the value. + /// TODO: Make this version. pub async fn counter(&self) -> u64 { let inner = self.rl().await; inner.counter() @@ -85,14 +84,14 @@ impl KVItem { // iterator.next() is called. #[derive(Clone)] pub(crate) struct KVItemInner { - status: Arc>, - kv: KV, - key: Vec, - // TODO, Opz memory - vptr: Vec, - value: TArcMx>, - meta: u8, - user_meta: u8, + pub(crate) status: Arc>, + kv: DB, + pub(crate) key: Vec, + pub(crate) vptr: Vec, + pub(crate) value: TArcMx>, + pub(crate) meta: u8, + pub(crate) user_meta: u8, + // TODO: rename to version ts. cas_counter: Arc, wg: Closer, err: Result<()>, @@ -121,9 +120,9 @@ impl Debug for KVItemInner { } impl KVItemInner { - pub(crate) fn new(key: Vec, value: ValueStruct, kv: KV) -> KVItemInner { + pub(crate) fn new(key: Vec, value: ValueStruct, kv: DB) -> KVItemInner { Self { - status: Arc::new(Atomic::new(PreFetchStatus::Empty)), + status: Arc::new(std::sync::RwLock::new(PreFetchStatus::Empty)), kv, key, value: TArcMx::new(Default::default()), @@ -138,7 +137,7 @@ impl KVItemInner { // Returns the key. Remember to copy if you need to access it outside the iteration loop. pub(crate) fn key(&self) -> &[u8] { - &self.key + crate::y::parse_key(self.key.as_ref()) } // Return value @@ -167,7 +166,7 @@ impl KVItemInner { ) -> Result<()> { // Wait result self.wg.wait().await; - if self.status.load(Ordering::Acquire) == Prefetched { + if *self.status.read().unwrap() == Prefetched { if self.err.is_err() { return self.err.clone(); } @@ -199,7 +198,7 @@ impl KVItemInner { let value = value.to_vec(); let value_wl = self.value.clone(); Box::pin(async move { - status_wl.store(Prefetched, Ordering::Release); + *status_wl.write().unwrap() = Prefetched; if value.is_empty() { return Ok(()); } @@ -267,6 +266,7 @@ impl Default for IteratorOptions { impl IteratorOptions { pub fn new(pre_fetch_values: bool, pre_fetch_size: isize, reverse: bool) -> Self { + assert!(pre_fetch_size > 0); IteratorOptions { pre_fetch_values, pre_fetch_size, @@ -287,13 +287,16 @@ pub(crate) const DEF_ITERATOR_OPTIONS: IteratorOptions = IteratorOptions { /// | | | /// IteratorExt reference pub struct IteratorExt { - kv: KV, + txn: BoxTxN, + read_ts: u64, + + // kv: DB, itr: MergeIterator, opt: IteratorOptions, - item: ArcRW>, - // Cache the prefetch keys, not inlcude current value + // Cache the prefetch keys, not include current value data: ArcRW>, has_rewind: ArcRW, + last_key: Arc>>, } /// TODO FIXME @@ -329,14 +332,15 @@ pub struct IteratorExt { // } impl IteratorExt { - pub(crate) fn new(kv: KV, itr: MergeIterator, opt: IteratorOptions) -> IteratorExt { + pub(crate) fn new(tv: *const TxN, itr: MergeIterator, opt: IteratorOptions) -> IteratorExt { IteratorExt { - kv, + txn: BoxTxN::new(tv), opt, itr, data: ArcRW::default(), - item: Arc::new(Default::default()), has_rewind: ArcRW::default(), + read_ts: 0, + last_key: Arc::new(Default::default()), } } @@ -356,14 +360,9 @@ impl IteratorExt { while let Some(el) = self.data.write().pop_front() { el.rl().await.wg.wait().await; } - while let Some(el) = self.itr.seek(key) { - if el.key().starts_with(_BADGER_PREFIX) { - continue; - } - break; - } + self.itr.seek(key); self.pre_fetch().await; - self.item.read().clone() + self.peek().await } // Rewind the iterator cursor all the wy to zero-th position, which would be the @@ -375,80 +374,42 @@ impl IteratorExt { el.rl().await.wg.wait().await; } // rewind the iterator - // rewind, next, rewind?, thie item is who! - let mut item = self.itr.rewind(); - // filter internal data - while item.is_some() && item.as_ref().unwrap().key().starts_with(_BADGER_PREFIX) { - item = self.itr.next(); - } - // Before every rewind, the item will be reset to None - self.item.write().take(); + // rewind, next, rewind?, the item is who! + self.itr.rewind(); // prefetch item. self.pre_fetch().await; // return the first el. - self.item.read().clone() + self.peek().await } // Advance the iterator by one (*NOTICE*: must be rewind when you call self.next()) + // to ensure you have access to a valid it.item() pub async fn next(&self) -> Option { - // Ensure current item has load - if let Some(el) = self.item.write().take() { - el.rl().await.wg.wait().await; // Just cleaner to wait before pushing to avoid doing ref counting. - } - // Set next item to current - if let Some(el) = self.data.write().pop_front() { - self.item.write().replace(el); - } // Advance internal iterator until entry is not deleted - while let Some(el) = self.itr.next() { - if el.key().starts_with(_BADGER_PREFIX) { - continue; - } - if el.value().meta & MetaBit::BIT_DELETE.bits() == 0 { + while self.itr.next().is_some() { + if self.parse_item().await { // Not deleted break; } } - let item = self.itr.peek(); - if item.is_none() { - return None; - } - - let xitem = self.new_item(); - self.fill(xitem.clone()).await; - self.data.write().push_back(xitem.clone()); - Some(xitem) + self.peek().await } pub async fn peek(&self) -> Option { - self.item.read().clone() + self.data.read().front().map(|item| item.clone()) } } impl IteratorExt { - // Returns false when iteration is done - // or when the current key is not prefixed by the specified prefix. - async fn valid_for_prefix(&self, prefix: &[u8]) -> bool { - self.item.read().is_some() - && self - .item - .read() - .as_ref() - .unwrap() - .rl() - .await - .key() - .starts_with(prefix) - } - // Close the iterator, It is important to call this when you're done with iteration. pub async fn close(&self) -> Result<()> { // TODO: We could handle this error. - self.kv.vlog.as_ref().unwrap().decr_iterator_count().await?; + let db = unsafe { self.txn.tx.as_ref().unwrap().get_kv() }; + db.vlog.as_ref().unwrap().decr_iterator_count().await?; Ok(()) } - // fill the value + // fill the value of merge iterator into item async fn fill(&self, item: KVItem) { let vs = self.itr.peek().unwrap(); let vs = vs.value(); @@ -488,35 +449,22 @@ impl IteratorExt { let itr = &self.itr; let mut count = 0; while let Some(item) = itr.peek() { - if item.key().starts_with(crate::kv::_BADGER_PREFIX) { - itr.next(); - continue; - } - if item.value().meta & MetaBit::BIT_DELETE.bits() > 0 { - itr.next(); + if !self.parse_item().await { continue; } count += 1; - let xitem = self.new_item(); - // fill a el from itr.peek - self.fill(xitem.clone()).await; - if self.item.read().is_none() { - self.item.write().replace(xitem); // store it - } else { - // push prefetch el into cache queue, Notice it not including current item - self.data.write().push_back(xitem); - } if count == pre_fetch_size { break; } - itr.next(); } + info!("Has fetch count: {}", count); } fn new_item(&self) -> KVItem { + let kv = self.txn().get_kv(); let inner_item = KVItemInner { - status: Arc::new(Atomic::new(PreFetchStatus::Empty)), - kv: self.kv.clone(), + status: Arc::new(std::sync::RwLock::new(PreFetchStatus::Empty)), + kv, key: vec![], value: TArcMx::new(Default::default()), vptr: vec![], @@ -529,8 +477,91 @@ impl IteratorExt { return KVItem::from(inner_item); } - // Returns false when iteration is done. - fn valid(&self) -> bool { - self.item.read().is_some() + // The is a complex function because it needs to handle both forward and reverse iteration + // implementation. We store keys such that their version are sorted in descending order. This makes + // forward iteration efficient, but reverse iteration complecated. This tradeoff is better because + // forward iteration is more common than reverse. + // + // This function advances the iterator. + // TODO ... + async fn parse_item(&self) -> bool { + let itr = &self.itr; + let item = itr.peek().unwrap(); + // Skip badger keys. + if item.key().starts_with(_BADGER_PREFIX) { + itr.next(); + return false; + } + // Skip any version which are *beyond* the read_ts + let ver = crate::y::parse_ts(item.key()); + info!( + "The version is {}, read_ts: {}", + hex_str(item.key()), + self.read_ts + ); + if ver > self.read_ts { + itr.next(); + return false; + } + // If iteration in forward direction. then just checking the last key against current key would + // be sufficient. + if !self.opt.reverse { + // The key has accessed, so don't access it again. + // TODO FIXME, I don't think so, a 5, b 7 (del), b5, b 6 + if crate::y::same_key_ignore_version(self.get_last_key().as_slice(), item.key()) { + itr.next(); + return false; + } + // Only track in forward direction. + // We should update last_key as soon as we find a different key in our snapshot. + // Consider keys: a 5, b 7 (del), b 5. When iterating, last_key = a. + // Then we see b 7, which is deleted. If we don't store last_key = b, we'll then return b 5, + // which is wrong. Therefore, update lastKey here. + self.last_key + .set(Arc::new(itr.peek().unwrap().key().to_vec())); + } + + loop { + // If deleted, advance and return. + if itr.peek().unwrap().value().meta & MetaBit::BIT_DELETE.bits() > 0 { + itr.next(); + return false; + } + + let item = self.new_item(); + // Load key, en,en,en, maybe lazy load + self.fill(item.clone()).await; + // fill item based on current cursor position. All Next calls have returned, so reaching here + // means no Next was called. + itr.next(); // Advance but no fill item yet. + if !self.opt.reverse || itr.peek().is_none() { + // forward direction, or invalid. + self.data.write().push_back(item); + return true; + } + + // Reverse direction + let next_ts = crate::y::parse_ts(itr.peek().as_ref().unwrap().key()); + if next_ts <= self.read_ts + && crate::y::same_key_ignore_version( + itr.peek().unwrap().key(), + item.key().await.as_ref(), + ) + { + // This is a valid potential candidate. + continue; + } + // Ignore the next candidate. Return the current one. + self.data.write().push_back(item); + return true; + } + } + + fn txn(&self) -> &TxN { + unsafe { self.txn.tx.as_ref().unwrap() } + } + + fn get_last_key(&self) -> Arc> { + self.last_key.get() } } diff --git a/src/kv.rs b/src/kv.rs index e699dba..b3bb088 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -34,6 +34,7 @@ use std::path::Path; use std::pin::Pin; use crate::pb::backup::KVPair; +use crate::transition::{GlobalTxNState, TxN}; use libc::difftime; use rand::random; use std::fmt::Formatter; @@ -42,14 +43,17 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime}; use std::{fmt, string, vec}; +use structopt::clap::AppSettings::WaitOnError; use tokio::fs::create_dir_all; use tokio::io::AsyncWriteExt; use tokio::sync::{RwLock, RwLockWriteGuard}; -/// -pub const _BADGER_PREFIX: &[u8; 8] = b"!badger!"; /// Prefix for internal keys used by badger. -pub const _HEAD: &[u8; 12] = b"!badger!head"; // For Storing value offset for replay. +pub const _BADGER_PREFIX: &[u8; 8] = b"!badger!"; +// For storing value offset for replay. +pub const _HEAD: &[u8; 12] = b"!badger!head"; +/// For the indicating end of entries in txn. +pub const TXN_KEY: &[u8; 11] = b"!badger!txn"; pub const KV_WRITE_CH_CAPACITY: usize = 1000; @@ -75,12 +79,6 @@ impl FlushTask { } } -/// A builder for KV building. -pub struct KVBuilder { - opt: Options, - kv: BoxKV, -} - /// Manage key/value #[doc(hidden)] #[derive(Clone)] @@ -107,6 +105,7 @@ pub struct KVCore { // we use an atomic op. pub(crate) last_used_cas_counter: Arc, share_lock: TArcRW<()>, + pub(crate) txn_state: GlobalTxNState, } impl Drop for KVCore { @@ -386,7 +385,7 @@ impl KVCore { count += 1; sz += self.opt.estimate_size(&entry) as u64; req.entries.push(EntryType::from(entry)); - req.ptrs.push(Arc::new(Atomic::new(None))); + req.ptrs.push(Arc::new(std::sync::RwLock::new(None))); req_index.push(i); } @@ -496,7 +495,7 @@ impl KVCore { // Will include deletion/tombstone case. debug!("Lsm ok, the value not at vlog file"); } else { - let ptr = req.ptrs.get(i).unwrap().load(Ordering::Relaxed); + let ptr = req.ptrs.get(i).unwrap().read().unwrap(); let ptr = ptr.unwrap(); let mut wt = Cursor::new(vec![0u8; ValuePointer::value_pointer_encoded_size()]); ptr.enc(&mut wt).unwrap(); @@ -571,12 +570,12 @@ impl KVCore { Ok(()) } - async fn update_offset(&self, ptrs: &mut Vec>>>) { + async fn update_offset(&self, ptrs: &mut Vec>>>) { // #[cfg(test)] // warn!("Ready to update offset"); let mut ptr = ValuePointer::default(); for tmp_ptr in ptrs.iter().rev() { - let tmp_ptr = tmp_ptr.load(Ordering::Acquire); + let tmp_ptr = tmp_ptr.read().unwrap(); if tmp_ptr.is_none() || tmp_ptr.unwrap().is_zero() { continue; } @@ -596,7 +595,10 @@ impl KVCore { } // Returns the current `mem_tables` and get references(here will incr mem table reference). - fn get_mem_tables<'a>(&'a self, p: &'a crossbeam_epoch::Guard) -> Vec> { + pub(crate) fn get_mem_tables<'a>( + &'a self, + p: &'a crossbeam_epoch::Guard, + ) -> Vec> { self.mem_st_manger.lock_exclusive(); defer! {self.mem_st_manger.unlock_exclusive()} @@ -652,7 +654,7 @@ impl KVCore { unsafe { &*st } } - fn must_vlog(&self) -> Arc { + pub(crate) fn must_vlog(&self) -> Arc { let vlog = self.vlog.clone().unwrap(); vlog } @@ -684,13 +686,13 @@ pub type WeakKV = XWeak; /// DB handle /// ``` -/// use badger_rs::{Options, KV}; +/// use badger_rs::{Options, DB}; /// use badger_rs::IteratorOptions; /// /// #[tokio::main] /// /// pub async fn main() { -/// let kv = KV::open(Options::default()).await.unwrap(); +/// let kv = DB::open(Options::default()).await.unwrap(); /// kv.set(b"foo".to_vec(), b"bar".to_vec(), 0x0).await.unwrap(); /// let value = kv.get(b"foo").await.unwrap(); /// assert_eq!(&value, b"bar"); @@ -707,11 +709,11 @@ pub type WeakKV = XWeak; ///} /// ``` #[derive(Clone)] -pub struct KV { +pub struct DB { inner: XArc, } -impl Deref for KV { +impl Deref for DB { type Target = KVCore; fn deref(&self) -> &Self::Target { @@ -719,15 +721,15 @@ impl Deref for KV { } } -impl fmt::Debug for KV { +impl fmt::Debug for DB { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("KV").finish() } } -impl KV { +impl DB { /// Async open a KV db with Options - pub async fn open(mut opt: Options) -> Result { + pub async fn open(mut opt: Options) -> Result { opt.max_batch_size = (15 * opt.max_table_size) / 100; opt.max_batch_count = 2 * opt.max_batch_size / Node::align_size() as u64; create_dir_all(opt.dir.as_str()).await?; @@ -783,6 +785,7 @@ impl KV { last_used_cas_counter: Arc::new(AtomicU64::new(1)), mem_st_manger: Arc::new(SkipListManager::new(opt.arena_size() as usize)), share_lock: TArcRW::new(tokio::sync::RwLock::new(())), + txn_state: GlobalTxNState::default(), }; let manifest = out.manifest.clone(); @@ -805,7 +808,7 @@ impl KV { } out.vlog.replace(Arc::new(vlog)); - let xout = KV::new(XArc::new(out)); + let xout = DB::new(XArc::new(out)); // update size { @@ -1192,8 +1195,21 @@ impl KV { // Extend sst.table itrs.extend(self.must_lc().as_iterator(opt.reverse)); let mitr = MergeIterOverBuilder::default().add_batch(itrs).build(); - IteratorExt::new(self.clone(), mitr, opt) + + IteratorExt::new(std::ptr::null(), mitr, opt) + } + + pub fn new_transaction(&self, update: bool) -> Result { + Ok(TxN { + read_ts: 0, + update, + reads: Arc::new(Default::default()), + writes: Arc::new(Default::default()), + pending_writes: Arc::new(Default::default()), + kv: self.clone(), + }) } + /// Closes a KV. It's crucial to call it to ensure all the pending updates /// make their way to disk. pub async fn close(&self) -> Result<()> { @@ -1261,7 +1277,14 @@ impl KV { Ok(()) } - pub async fn backup(&self, mut wt: W) -> Result<()> + /// Backup dumps a protobuf-encoded list of all entries in the database into the + /// given writer, that are newer than the specified version. It returns a + /// timestamp indicating when the entries were dumped which can be passed into a + /// later invocation to generate an incremental dump, of entries that have been + /// added/modified since the last invocation of DB.Backup() + /// + /// This can be used to backup the data in a database at a given point in time. + pub async fn backup(&self, mut wt: W, since: u64) -> Result<()> where W: Write, { @@ -1281,7 +1304,7 @@ impl KV { } } -impl KV { +impl DB { // pub(crate) async fn new_std_iterator( // &self, // opt: IteratorOptions, @@ -1331,8 +1354,8 @@ impl KV { self.inner.clone() } - pub(crate) fn new(inner: XArc) -> KV { - KV { inner } + pub(crate) fn new(inner: XArc) -> DB { + DB { inner } } pub(crate) fn to_ref(&self) -> &KVCore { diff --git a/src/kv_test.rs b/src/kv_test.rs index 56bb296..78cfb67 100644 --- a/src/kv_test.rs +++ b/src/kv_test.rs @@ -17,7 +17,7 @@ use crate::test_util::{push_log, remove_push_log, tracing_log}; use crate::types::{TArcMx, XArc}; use crate::value_log::{Entry, MetaBit, MAX_KEY_SIZE}; use crate::y::hex_str; -use crate::{kv::KVCore, options::Options, Error, KV}; +use crate::{kv::KVCore, options::Options, Error, DB}; fn get_test_option(dir: &str) -> Options { let mut opt = Options::default(); @@ -33,13 +33,20 @@ async fn t_1_write() { use crate::test_util::{random_tmp_dir, tracing_log}; tracing_log(); let dir = random_tmp_dir(); - let kv = KV::open(get_test_option(&dir)).await; + let kv = DB::open(get_test_option(&dir)).await; let kv = kv.unwrap(); let res = kv.set(b"hello".to_vec(), b"word".to_vec(), 10).await; assert!(res.is_ok()); let got = kv.get(b"hello").await; assert!(got.is_ok()); assert_eq!(&got.unwrap(), b"word"); + + // Write again + let res = kv.set(b"hello".to_vec(), b"word1".to_vec(), 20).await; + assert!(res.is_ok()); + let got = kv.get(b"hello").await; + assert!(got.is_ok()); + assert_eq!(&got.unwrap(), b"word1"); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -47,7 +54,7 @@ async fn t_batch_write() { use crate::test_util::{random_tmp_dir, tracing_log}; tracing_log(); let dir = random_tmp_dir(); - let kv = KV::open(get_test_option(&dir)).await; + let kv = DB::open(get_test_option(&dir)).await; let kv = kv.unwrap(); let n = 50000; let mut batch = vec![]; @@ -127,7 +134,7 @@ async fn t_batch_write() { "cost time: {}s", SystemTime::now().duration_since(start).unwrap().as_secs() ); - kv.must_lc().print_level_fids(); + //kv.must_lc().print_level_fids(); kv.close().await.unwrap(); } @@ -136,7 +143,7 @@ async fn t_concurrent_write() { use crate::test_util::{random_tmp_dir, tracing_log}; tracing_log(); let dir = random_tmp_dir(); - let kv = KV::open(get_test_option(&dir)).await; + let kv = DB::open(get_test_option(&dir)).await; let kv = kv.unwrap(); let mut wg = awaitgroup::WaitGroup::new(); let n = 20; @@ -495,7 +502,7 @@ async fn kv_iterator_basic() { tracing_log(); let kv = build_kv().await; - let n = 10000; + let n = 1; let bkey = |i: usize| format!("{:09}", i).as_bytes().to_vec(); let bvalue = |i: usize| format!("{:025}", i).as_bytes().to_vec(); @@ -519,8 +526,9 @@ async fn kv_iterator_basic() { let itr = kv.new_iterator(opt).await; let mut count = 0; let mut rewind = true; - info!("Startinh first basic iteration"); - itr.rewind().await; + info!("Starting first basic iteration"); + let first = itr.rewind().await; + info!("first {:?}", first); while let Some(item) = itr.peek().await { let rd_item = item.rl().await; let key = rd_item.key(); @@ -674,7 +682,7 @@ async fn t_delete_without_sync_write() { drop(kv); // Reopen kv, it should failed { - let kv = KV::open(opt).await.unwrap(); + let kv = DB::open(opt).await.unwrap(); let got = kv.get_with_ext(&key).await; assert!(got.unwrap_err().is_not_found()); } @@ -700,7 +708,7 @@ async fn t_kv_set_if_absent() { async fn t_kv_pid_file() { tracing_log(); let kv1 = build_kv().await; - let kv2 = KV::open(kv1.opt.clone()).await; + let kv2 = DB::open(kv1.opt.clone()).await; let err = kv2.unwrap_err(); assert!(err .to_string() @@ -848,7 +856,7 @@ async fn t_kv_dump() { .create(true) .open(back_tmp.clone()) .unwrap(); - kv.backup(fp).await.unwrap(); + kv.backup(fp, 0).await.unwrap(); kv.close().await.unwrap(); info!("backup: {:?}", back_tmp); } @@ -889,11 +897,11 @@ async fn t_kv_dump() { // ); // } -async fn build_kv() -> KV { +async fn build_kv() -> DB { use crate::test_util::random_tmp_dir; tracing_log(); let dir = random_tmp_dir(); - let kv = KV::open(get_test_option(&dir)).await; + let kv = DB::open(get_test_option(&dir)).await; let kv = kv.unwrap(); kv } diff --git a/src/levels.rs b/src/levels.rs index 44eb7be..2509693 100644 --- a/src/levels.rs +++ b/src/levels.rs @@ -7,7 +7,7 @@ use crate::pb::badgerpb3::ManifestChange; use crate::table::builder::Builder; use crate::table::iterator::{ConcatIterator, IteratorImpl, IteratorItem}; use crate::table::table::{get_id_map, new_file_name, Table, TableCore}; -use crate::types::{Channel, Closer, TArcMx, TArcRW, XArc}; +use crate::types::{Channel, Closer, TArcRW, XArc}; use crate::y::{ async_sync_directory, create_synced_file, open_existing_synced_file, sync_directory, }; @@ -18,24 +18,21 @@ use atomic::Ordering; use awaitgroup::WaitGroup; use drop_cell::defer; use log::{debug, error, info, warn}; -use parking_lot::lock_api::RawRwLock; use tracing::instrument; -use itertools::Itertools; +use crate::pb::badgerpb3::manifest_change::Operation::{CREATE, DELETE}; use rand::random; use std::collections::HashSet; use std::fmt::{Debug, Display, Formatter}; use std::fs::remove_file; use std::io::Write; -use std::ops::Deref; +use std::ops::{Deref, MulAssign}; use std::sync::atomic::AtomicU64; -use std::sync::{Arc, RwLockReadGuard}; +use std::sync::Arc; use std::time::{Duration, SystemTime}; use std::vec; use tokio::macros::support::thread_rng_n; -use tokio::sync::{RwLock, RwLockWriteGuard}; -use tokio::time::sleep; -use crate::pb::badgerpb3::manifest_change::Operation::{CREATE, DELETE}; +use tokio::sync::RwLock; #[derive(Clone)] pub(crate) struct LevelsController { @@ -235,7 +232,7 @@ impl LevelsController { let pick: Vec = self.pick_compact_levels(); info!("Try to compact levels, {:?}", pick); if pick.is_empty() { - zero_level_compact_chan.try_send(()); + _ = zero_level_compact_chan.try_send(()); } for p in pick { match self.do_compact(p.clone()).await { @@ -243,7 +240,7 @@ impl LevelsController { info!("Succeed to compacted"); if p.level == 0 { // zero level has compacted, memory SkipList can continue handle *write request* - zero_level_compact_chan.try_send(()); + _ = zero_level_compact_chan.try_send(()); } }, Ok(false) => { @@ -262,14 +259,14 @@ impl LevelsController { info!("Try to compact levels, {:?}", pick); if pick.is_empty() { // No table need to compact, notify `KV` continue handle *write request* - zero_level_compact_chan.try_send(()); + _ = zero_level_compact_chan.try_send(()); } for p in pick { match self.do_compact(p.clone()).await { Ok(true) => { info!("Succeed to compacted"); if p.level == 0 { - zero_level_compact_chan.try_send(()); + _ = zero_level_compact_chan.try_send(()); } }, Ok(false) => { @@ -442,7 +439,7 @@ impl LevelsController { let notify_try_compact_chan = self.notify_try_compact_chan.tx(); while !self.levels[0].try_add_level0_table(table.clone()).await { // Notify compact job - notify_try_compact_chan.try_send(()); + _ = notify_try_compact_chan.try_send(()); // Stall. Make sure all levels are healthy before we unstall. let mut start_time = SystemTime::now(); { @@ -455,7 +452,12 @@ impl LevelsController { .unwrap() .as_millis() ); - info!("{:?}, {}", self.opt, self.levels[0].num_tables()); + info!( + "{:?}, {}, cost: {:?}", + self.opt, + self.levels[0].num_tables(), + start_time.elapsed().unwrap() + ); let c_status = self.c_status.rl(); for i in 0..self.opt.max_levels { info!( @@ -537,7 +539,7 @@ impl LevelsController { let mut g = WaitGroup::new(); let execute_time = SystemTime::now(); defer! { - let cost = SystemTime::now().duration_since(execute_time).unwrap().as_millis(); + let _cost = SystemTime::now().duration_since(execute_time).unwrap().as_millis(); } { let cd = cd.read().await; @@ -558,7 +560,7 @@ impl LevelsController { } else { assert_eq!(1, top_tables.len()); } - let is_empty = bot_tables.is_empty(); + let _is_empty = bot_tables.is_empty(); for tb in top_tables { let iter = Box::new(IteratorImpl::new(tb, false)); itr.push(iter); @@ -574,39 +576,20 @@ impl LevelsController { // mitr.export_disk_ext(); // } mitr.rewind(); - let tid = random::(); - let mut count = 0; - let cur = tokio::runtime::Handle::current(); + let mut _count = 0; + let _cur = tokio::runtime::Handle::current(); loop { // #[cfg(test)] // let mut keys = vec![]; let start_time = SystemTime::now(); let mut builder = Builder::default(); while let Some(value) = mitr.peek() { - count += 1; + _count += 1; assert!(builder.add(value.key(), value.value()).is_ok()); mitr.next(); if builder.reached_capacity(self.opt.max_table_size) { break; } - - // #[cfg(test)] - // { - // // error!("merge, mitr{}, key {}", mitr.id(), hex_str(value.key())); - // { - // crate::test_util::push_log( - // format!( - // "tid:{}, mitr:{}, key:{}", - // tid, - // mitr.id(), - // hex_str(value.key()) - // ) - // .as_bytes(), - // false, - // ); - // } - // keys.push(value.key().to_vec()); - // } } if builder.is_zero_bytes() { warn!("Builder is empty"); @@ -916,15 +899,6 @@ impl LevelsController { let id = self.next_file_id.fetch_add(1, Ordering::Relaxed); id } - - pub(crate) fn print_level_fids(&self) { - let sz = self - .levels - .iter() - .map(|lv| lv.num_tables()) - .collect::>(); - warn!("every level table's size: {:?}", sz); - } } #[derive(Debug, Clone)] diff --git a/src/lib.rs b/src/lib.rs index 34dc6c8..96c1cea 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,6 @@ #![feature(type_alias_impl_trait)] #![feature(strict_provenance_atomic_ptr)] #![feature(atomic_from_mut)] -#![feature(cursor_remaining)] #![feature(pattern)] #![feature(cell_leak)] #![feature(path_file_prefix)] @@ -22,7 +21,7 @@ #![feature(binary_heap_into_iter_sorted)] #![feature(test)] #![feature(atomic_from_ptr, pointer_is_aligned)] - +#![feature(unboxed_closures)] /// Badger DB is an embedded keyvalue database. /// @@ -48,6 +47,7 @@ mod y; mod compaction; // #[cfg(test)] // mod kv_test; +mod backup; #[cfg(test)] mod kv_test; mod levels; @@ -55,7 +55,7 @@ mod pb; mod st_manager; #[cfg(test)] mod test_util; -mod backup; +mod transition; pub use iterator::*; pub use kv::*; @@ -76,4 +76,4 @@ pub(crate) fn must_align(ptr: *const T) { pub(crate) fn cals_size_with_align(sz: usize, align_sz: usize) -> usize { let size = (sz + align_sz) & !align_sz; size -} \ No newline at end of file +} diff --git a/src/skl/skip.rs b/src/skl/skip.rs index e44c433..fa2d63f 100644 --- a/src/skl/skip.rs +++ b/src/skl/skip.rs @@ -1,15 +1,15 @@ use crate::skl::{Cursor, HEIGHT_INCREASE, MAX_HEIGHT}; use crate::table::iterator::IteratorItem; -use crate::y::ValueStruct; +use crate::y::{same_key_ignore_version, ValueStruct}; use crate::{Allocate, Xiterator}; +use drop_cell::defer; use log::{info, warn}; use rand::random; use std::fmt::{Debug, Display, Formatter}; use std::sync::atomic::{AtomicPtr, AtomicU32, Ordering}; use std::sync::Arc; use std::{cmp, ptr, sync::atomic::AtomicI32}; -use drop_cell::defer; use uuid::Uuid; use super::{arena::Arena, node::Node}; @@ -74,15 +74,10 @@ impl SkipList { } // Sub crease the reference count, deallocating the skiplist when done using it - // TODO pub fn decr_ref(&self) { self._ref.fetch_sub(1, Ordering::Release); } - // fn valid(&self) -> bool { - // self.arena_ref().valid() - // } - pub(crate) fn arena_ref(&self) -> &Arena { &self.arena } @@ -226,8 +221,8 @@ impl SkipList { /// Inserts the key-value pair. /// FIXME: it bad, should be not use unsafe, but .... - pub fn put(&self, key: &[u8], v: ValueStruct) { - self._put(key, v) + pub fn put>(&self, key: T, v: ValueStruct) { + self._put(key.as_ref(), v) } fn _put(&self, key: &[u8], v: ValueStruct) { @@ -375,6 +370,10 @@ impl SkipList { } if let Some(node) = node { let (offset, size) = node.get_value_offset(); + // diff key has same prefix + if !same_key_ignore_version(key, self.arena.get_key(node.key_offset, node.key_size)) { + return None; + } let value = self.arena.get_val(offset, size); #[cfg(test)] @@ -458,15 +457,17 @@ impl Display for SkipList { k: String, v: String, } - let mut kv = vec![]; + let mut kv = vec![KV { + k: "st-id".to_owned(), + v: self.id().to_string(), + }]; for _kv in self.key_values() { kv.push(KV { - k: String::from_utf8(_kv.0.to_vec()).unwrap(), - v: String::from_utf8(_kv.1.value).unwrap(), + k: crate::hex_str(_kv.0), + v: crate::hex_str(_kv.1.value), }); } let table = Table::new(kv).to_string(); - writeln!(f, "SkipList=>").unwrap(); writeln!(f, "{}", table) } } @@ -877,7 +878,7 @@ mod tests { #[test] fn t_one_key() { - let key = "thekey"; + let key = b"thekey"; let st = Arc::new(SkipList::new(ARENA_SIZE)); let mut waits = (0..100) .map(|i| { @@ -885,8 +886,8 @@ mod tests { let key = key.clone(); spawn(move || { st.put( - key.as_bytes(), - ValueStruct::new(format!("{}", i).as_bytes().to_vec(), 0, 0, i as u64), + &key, + ValueStruct::new(i.to_string().as_bytes().to_vec(), 0, 0, i as u64), ); }) }) @@ -898,7 +899,7 @@ mod tests { let key = key.clone(); let save_value = save_value.clone(); let join = spawn(move || { - if let Some(value) = st.get(key.as_bytes()) { + if let Some(value) = st.get(&key) { save_value.fetch_add(1, Ordering::Relaxed); let v = String::from_utf8_lossy(&value.value) .parse::() diff --git a/src/table/builder.rs b/src/table/builder.rs index af30a65..876920d 100644 --- a/src/table/builder.rs +++ b/src/table/builder.rs @@ -2,9 +2,7 @@ use crate::y::{hash, is_eof, Decode, Encode, ValueStruct}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use drop_cell::defer; use growable_bloom_filter::GrowableBloom; -use log::{debug, info}; use serde_json; -use std::hash::Hasher; use std::io::{Cursor, Read, Write}; use std::time::SystemTime; @@ -231,12 +229,12 @@ impl Default for Builder { fn default() -> Self { Self { counter: 0, - buf: Cursor::new(Vec::with_capacity(64 << 20)), + buf: Cursor::new(Vec::with_capacity(1 << 20)), base_key: vec![], base_offset: 0, restarts: vec![], prev_offset: u32::MAX, - key_buf: Cursor::new(Vec::with_capacity(32 << 20)), + key_buf: Cursor::new(Vec::with_capacity(1 << 20)), key_count: 0, } } diff --git a/src/table/table.rs b/src/table/table.rs index e6daef4..23c9488 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -51,7 +51,6 @@ impl Display for KeyOffset { } pub type Table = XArc; -pub type WeakTable = XWeak; impl From for Table { fn from(value: TableCore) -> Self { @@ -164,38 +163,18 @@ impl TableCore { let mut tc = table_ref.to_inner().unwrap(); tc.biggest = biggest; tc.smallest = smallest; - // info!("open table ==> {}", tc); + Ok(tc) } // increments the refcount (having to do with whether the file should be deleted) pub(crate) fn incr_ref(&self) { - use std::backtrace::Backtrace; - let count = self._ref.fetch_add(1, Ordering::Release); - let buf = format!( - "incr {} table count {} => {}", - self.id, - count, - self.get_ref() - ); - // info!("{}", buf); - // - // info!( - // "BackTrace at table incr reference: {}", - // Backtrace::force_capture() - // ); - //push_log(buf.as_bytes(), false); + //use std::backtrace::Backtrace; + self._ref.fetch_add(1, Ordering::Release); } // decrements the refcount and possibly deletes the table pub(crate) fn decr_ref(&self) { - let count = self._ref.fetch_sub(1, Ordering::Release); - let buf = format!( - "decr {} table count {} => {}", - self.id, - count, - self.get_ref() - ); - //push_log(buf.as_bytes(), false); + self._ref.fetch_sub(1, Ordering::Release); } pub(crate) fn get_ref(&self) -> i32 { @@ -385,11 +364,6 @@ impl Drop for TableCore { impl Display for TableCore { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let index_str = self - .block_index - .iter() - .map(|x| format!("{}", x)) - .collect::>(); let smallest = hex_str(self.smallest()); let biggest = hex_str(self.biggest()); f.debug_struct("Table") diff --git a/src/test_util.rs b/src/test_util.rs index 9a03b4b..64810f5 100644 --- a/src/test_util.rs +++ b/src/test_util.rs @@ -1,15 +1,12 @@ -use atomic::Atomic; +use crate::Options; use chrono::Local; -use log::{info, kv::source::as_map, kv::Source, warn, Level}; +use log::{info, warn}; use rand::random; -use std::collections::HashMap; use std::env::temp_dir; use std::fs::create_dir_all; use std::io; -use std::sync::atomic::{AtomicI32, AtomicU64, Ordering}; +use std::sync::atomic::Ordering; use std::sync::Arc; -use std::time::Duration; -use tokio::runtime::Handle; use tokio_metrics::TaskMonitor; use tracing_subscriber::fmt::format::Writer; use tracing_subscriber::fmt::time::FormatTime; @@ -33,7 +30,7 @@ pub fn push_log(buf: &[u8], rd: bool) { // #[cfg(test)] // return; use std::io::Write; - let mut fpath = "raw_log.log"; + let fpath = "raw_log.log"; let mut fp = if !rd { std::fs::File::options() .write(true) @@ -56,13 +53,11 @@ pub fn push_log(buf: &[u8], rd: bool) { #[cfg(test)] pub fn remove_push_log() { use std::fs::remove_file; - remove_file("raw_log.log"); + remove_file("raw_log.log").unwrap(); } #[cfg(test)] pub(crate) fn tracing_log() { - use libc::remove; - use tracing::{info, Level}; use tracing_subscriber; struct LocalTimer; @@ -99,23 +94,6 @@ pub(crate) fn tracing_log() { // let recorder = metrics_prometheus::install(); } -#[cfg(test)] -pub(crate) async fn start_metrics() -> TaskMonitor { - let monitor = tokio_metrics::TaskMonitor::new(); - // print task metrics every 500ms - { - let frequency = std::time::Duration::from_millis(500); - let monitor = monitor.clone(); - tokio::spawn(async move { - for metrics in monitor.intervals() { - warn!("{:?}", metrics); - tokio::time::sleep(frequency).await; - } - }); - } - monitor -} - pub fn random_tmp_dir() -> String { let id = random::(); let path = temp_dir().join(id.to_string()).join("badger"); @@ -128,24 +106,6 @@ pub fn create_random_tmp_dir() -> String { fpath } -#[test] -fn it_work() { - #[tracing::instrument(skip_all)] - fn call() { - info!("call c"); - } - - #[tracing::instrument(skip_all)] - fn my_function(my_arg: usize) { - info!("execute my function"); - call(); - } - - tracing_log(); - my_function(1000); - info!("Hello Body"); -} - #[tokio::test] async fn runtime_tk() { use tokio::{sync::RwLock, task::JoinHandle}; @@ -197,7 +157,7 @@ fn tk2() { let a = Arc::new(std::sync::atomic::AtomicI32::new(10000000)); let ac = a.clone(); rt.block_on(async move { - for i in 0..10000 { + for _ in 0..10000 { let ac = ac.clone(); tokio::spawn(async move { ac.fetch_sub(1, Ordering::Relaxed); @@ -214,6 +174,13 @@ fn tk2() { println!("return {}", ret); }); println!("{}", a.load(Ordering::Relaxed)); +} - use itertools::Merge; +pub(crate) fn get_test_option(dir: &str) -> Options { + let mut opt = Options::default(); + opt.max_table_size = 1 << 15; // Force more compaction. + opt.level_one_size = 4 << 15; // Force more compaction. + opt.dir = Box::new(dir.to_string()); + opt.value_dir = Box::new(dir.to_string()); + opt } diff --git a/src/transition.rs b/src/transition.rs new file mode 100644 index 0000000..9d4e9f8 --- /dev/null +++ b/src/transition.rs @@ -0,0 +1,308 @@ +use crate::iterator::{KVItemInner, PreFetchStatus}; +use crate::table::iterator::IteratorItem; +use crate::types::TArcRW; +use crate::value_log::{Entry, MetaBit}; +use crate::y::compare_key; +use crate::{ + Error, IteratorExt, IteratorOptions, KVItem, MergeIterOverBuilder, Result, UniIterator, + ValueStruct, Xiterator, DB, TXN_KEY, +}; +use drop_cell::defer; +use parking_lot::RwLock; +use std::collections::{BinaryHeap, HashMap, HashSet}; +use std::ops::Deref; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use tokio::sync::{RwLockReadGuard, RwLockWriteGuard}; + +#[derive(Clone, Default)] +pub struct GlobalTxNState { + inner: TArcRW, +} + +impl GlobalTxNState { + pub async fn rl(&self) -> RwLockReadGuard<'_, GlobalTxNStateInner> { + self.inner.read().await + } + pub async fn wl(&self) -> RwLockWriteGuard<'_, GlobalTxNStateInner> { + self.inner.write().await + } + + pub async fn read_ts(&self) -> u64 { + self.rl().await.read_ts() + } +} + +struct GlobalTxNStateInner { + // TODO may be not lock, because outline has locked a time + lock: Arc>, + cur_read: AtomicU64, + next_commit: AtomicU64, + // These two structures are used to figure out when a commit is done. The minimum done commit is + // used to update cur_read. + commit_mark: BinaryHeap, + pending_commits: HashSet, + + // commits stores a key fingerprint and latest commit counter for it. + commits: HashMap, +} + +impl Default for GlobalTxNStateInner { + fn default() -> Self { + GlobalTxNStateInner { + lock: Default::default(), + cur_read: Default::default(), + next_commit: AtomicU64::new(1), + commit_mark: Default::default(), + pending_commits: Default::default(), + commits: Default::default(), + } + } +} + +impl GlobalTxNStateInner { + fn read_ts(&self) -> u64 { + self.cur_read.load(Ordering::SeqCst) + } + + // must be called while having a lock. + fn has_conflict(&self, tx: &TxN) -> bool { + let tx_reads = tx.reads.write().unwrap(); + if tx_reads.is_empty() { + return false; + } + // check the all tx keys version must be more than `commits` + for ro in tx_reads.iter() { + if let Some(ts) = self.commits.get(ro) + && *ts > tx.read_ts + { + return true; + } + } + false + } + + fn new_commit_ts(&mut self, tx: &TxN) -> u64 { + let _lock = self.lock.write(); + + if self.has_conflict(tx) { + return 0; + } + + let ts = self.next_commit.load(Ordering::SeqCst); + let tx_writes = tx.writes.write().unwrap(); + for w in tx_writes.iter() { + // Update the commit_ts. + self.commits.insert(*w, ts); + } + self.commit_mark.push(ts); + if self.pending_commits.contains(&ts) { + panic!("We shouldn't have the commit ts: {}", ts); + } + + self.pending_commits.insert(ts); + self.next_commit.fetch_add(1, Ordering::SeqCst); + ts + } + + fn done_commit(&mut self, cts: u64) { + let _lock = self.lock.write(); + if !self.pending_commits.remove(&cts) { + panic!("We should already have the commit ts: {}", cts); + } + + let mut min = 0; + while !self.commit_mark.is_empty() { + let ts = self.commit_mark.peek().unwrap(); + if self.pending_commits.contains(ts) { + // Still waiting for a txn to commit. + break; + } + min = *ts; + self.commit_mark.pop(); + } + + if min == 0 { + return; + } + + self.cur_read.store(min, Ordering::SeqCst); + self.next_commit.store(min + 1, Ordering::SeqCst); + } +} + +#[derive(Clone)] +pub struct TxN { + // the read operation ts, it also was the version of key. + pub(crate) read_ts: u64, + // update is used to conditionally keep track of reads. + pub(crate) update: bool, + // contains fingerprints of keys read. + pub(crate) reads: Arc>>, + // contains fingerprints(hash64(key)) of keys written. + pub(crate) writes: Arc>>, + // cache stores any writes done by txn. + pub(crate) pending_writes: Arc, Entry>>>, + + pub(crate) kv: DB, +} + +impl TxN { + // set a key, value with user meta at the tx + pub fn set(&self, key: Vec, value: Vec, user_meta: u8) { + let fp = farmhash::fingerprint64(&key); // Avoid dealing with byte arrays + self.writes + .write() + .map(|mut writes| writes.push(fp)) + .unwrap(); + let entry = Entry::default() + .key(key.clone()) + .value(value) + .user_meta(user_meta); + self.pending_writes + .write() + .map(|mut writes| writes.insert(key, entry)) + .unwrap(); + } + + // delete a key with at the tx + pub fn delete(&self, key: &[u8]) { + let fp = farmhash::fingerprint64(&key); + self.writes + .write() + .map(|mut writes| writes.push(fp)) + .unwrap(); + let entry = Entry::default() + .key(key.to_vec()) + .meta(MetaBit::BIT_DELETE.bits()); + self.pending_writes + .write() + .map(|mut writes| writes.insert(key.to_vec(), entry)) + .unwrap(); + } + + pub async fn get(&self, key: &[u8]) -> Result { + let kv_db = self.get_kv(); + let item = KVItem::from(KVItemInner::new( + vec![], + ValueStruct::default(), + kv_db.clone(), + )); + // if the transaction is writeable, Prioritize reading local writes + if self.update { + let pending_writes = self.pending_writes.write().unwrap(); + if let Some(e) = pending_writes.get(key) + && &e.key == key + { + // Fulfill from cache. + { + let mut wl = item.wl().await; + wl.meta = e.meta; + *wl.value.lock().await = e.value.clone(); + wl.user_meta = e.user_meta; + wl.key = e.key.clone(); + *wl.status.write().unwrap() = PreFetchStatus::Prefetched; + } + // We probably don't need to set KV on item here. + return Ok(item); + } + + // Notice: if not found at local writes, need to read operation because they key seek from DB below. + let fp = farmhash::fingerprint64(key); + self.reads + .write() + .map(|mut writes| writes.push(fp)) + .unwrap(); + } + // Seek from DB + let seek = crate::y::key_with_ts(key, self.read_ts); + kv_db.get_with_ext(&seek).await + } + + pub async fn commit(&self) -> Result<()> { + if self.writes.write().unwrap().is_empty() { + return Ok(()); // Ready only translation. + } + // TODO FIXME lock + let gs = self.get_kv().txn_state.clone(); + let mut gs = gs.inner.write().await; + // Get a new commit ts for current tx + let commit_ts = gs.new_commit_ts(&self); + if commit_ts == 0 { + return Err(Error::TxCommitConflict); + } + + let mut entries = vec![]; + let mut pending_writes = self.pending_writes.write().unwrap(); + for e in pending_writes.values_mut().into_iter() { + // Suffix the keys with commit ts, so the key versions are sorted in descending order of commit timestamp. + // descending order of commit timestamp. + let ts_key = crate::y::key_with_ts(&e.key, commit_ts); + e.key.clear(); + e.key.extend_from_slice(&ts_key); + entries.push(e.clone()); + } + // TODO: Add logic in replay to deal with this. + let entry = Entry::default() + .key(TXN_KEY.to_vec()) + .value(commit_ts.to_string().as_bytes().to_vec()) + .meta(MetaBit::BIT_FIN_TXN.bits()); + entries.push(entry); + + // TODO if has some key failed to execute .....? + defer! {gs.done_commit(commit_ts)}; + + let db = self.get_kv(); + let res = db.batch_set(entries).await; + + for ret in res { + ret?; + } + + Ok(()) + } + + fn new_iterator(&self, opt: IteratorOptions) -> IteratorExt { + let db = self.get_kv(); + // Notice, the iterator is global iterator, so must incr reference for memtable(SikpList), sst(file), vlog(file). + let p = crossbeam_epoch::pin(); + let tables = db.get_mem_tables(&p); + // TODO it should decr at IteratorExt close. + defer! { + tables.iter().for_each(|table| unsafe {table.as_ref().unwrap().decr_ref()}); + } + // add vlog reference. + db.must_vlog().incr_iterator_count(); + + // Create iterators across all the tables involved first. + let mut itrs: Vec>> = vec![]; + for tb in tables.clone() { + let st = unsafe { tb.as_ref().unwrap().clone() }; + let iter = Box::new(UniIterator::new(st, opt.reverse)); + itrs.push(iter); + } + // Extend sst.table + itrs.extend(db.must_lc().as_iterator(opt.reverse)); + let mitr = MergeIterOverBuilder::default().add_batch(itrs).build(); + let txn = self as *const TxN; + IteratorExt::new(txn, mitr, opt) + } + + pub(crate) fn get_kv(&self) -> DB { + self.kv.clone() + } +} + +pub(crate) struct BoxTxN { + pub tx: *const TxN, +} + +unsafe impl Send for BoxTxN {} + +unsafe impl Sync for BoxTxN {} + +impl BoxTxN { + pub(crate) fn new(tx: *const TxN) -> BoxTxN { + BoxTxN { tx } + } +} diff --git a/src/value_log.rs b/src/value_log.rs index 8b27287..aa5a2a2 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -1,33 +1,24 @@ use async_channel::Receiver; -use atomic::Atomic; - -use bitflags::{bitflags, Flags}; +use bitflags::bitflags; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; -use bytes::BufMut; +use bytes::Buf; use crc32fast::Hasher; use drop_cell::defer; use getset::{Getters, Setters}; - -use log::kv::Source; -use log::{debug, info}; +use log::info; use memmap::Mmap; - use rand::random; - use std::collections::{HashMap, HashSet}; -use std::fmt::{Debug, Display, Formatter}; +use std::fmt::{Debug, Formatter}; use std::fs::{read_dir, remove_file}; use std::future::Future; -use std::io::{BufRead, Cursor, Read, Seek, SeekFrom, Write}; - +use std::io::{Cursor, Read, Seek, SeekFrom, Write}; use std::mem::size_of; -use std::ops::{Deref, Index}; +use std::ops::Index; use std::path::Path; use std::pin::Pin; - use std::sync::atomic::{AtomicI32, AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; - use std::time::{Duration, SystemTime}; use std::{fmt, fs, io, ptr}; @@ -40,7 +31,7 @@ use crate::options::Options; use crate::types::{Channel, Closer, TArcRW}; use crate::y::{create_synced_file, open_existing_synced_file, sync_directory, Decode, Encode}; use crate::Error::Unexpected; -use crate::{event, hex_str, Error, Result, EMPTY_SLICE}; +use crate::{event, Error, Result, EMPTY_SLICE}; bitflags! { /// Values have their first byte being byteData or byteDelete. This helps us distinguish between @@ -53,7 +44,9 @@ bitflags! { const BIT_UNUSED = 4; /// Set if the key is set using SetIfAbsent. const BIT_SET_IF_ABSENT = 8; - } + /// Set if the entry is to indicate end of txn in value log. + const BIT_FIN_TXN = 16; + } } const M: u64 = 1 << 20; @@ -373,7 +366,7 @@ pub struct Request { // Input values pub(crate) entries: Vec, // Output Values and wait group stuff below - pub(crate) ptrs: Vec>>>, + pub(crate) ptrs: Vec>>>, } impl Default for Request { @@ -417,14 +410,6 @@ impl Request { .map(|ty| ty.fut_ch.clone()) .collect::>() } - - fn get_ptrs(&self) -> Vec>>> { - self.ptrs.clone() - } - - fn get_ptr(&self, i: usize) -> Option<&Arc>>> { - self.ptrs.get(i) - } } pub struct ValueLogCore { @@ -759,19 +744,10 @@ impl ValueLogCore { if !self.opt.sync_writes && entry.entry().value.len() < self.opt.value_threshold { // No need to write to value log. // WARN: if mt not flush into disk but process abort, that will discard data(the data not write into vlog that WAL file) - req.ptrs[idx] = Arc::new(Atomic::new(None)); + req.ptrs[idx] = Arc::new(std::sync::RwLock::new(None)); continue; } - #[cfg(test)] - debug!( - "Write # {:?} => {} into vlog file, offset: {}, meta:{}", - hex_str(entry.entry().key.as_ref()), - hex_str(entry.entry().value.as_ref()), - self.buf.read().await.get_ref().len() - + self.writable_log_offset.load(Ordering::Acquire) as usize, - entry.entry.meta, - ); let mut ptr = ValuePointer::default(); ptr.fid = cur_fid; // Use the offset including buffer length so far. @@ -786,13 +762,13 @@ impl ValueLogCore { buf.get_ref().len() + self.writable_log_offset.load(Ordering::Acquire) as usize, ptr.offset as usize + sz ); - req.ptrs[idx].store(Some(ptr), Ordering::Release); + req.ptrs[idx].write().unwrap().replace(ptr); } } { - assert!(wt_count <= 0 || !self.buf.read().await.is_empty()); + assert!(wt_count <= 0 || self.buf.read().await.has_remaining()); let mut buffer = self.buf.write().await; - if buffer.is_empty() { + if !buffer.has_remaining() { return Ok(()); } // write value pointer into vlog file. (Just only write to mmap) @@ -1087,11 +1063,6 @@ impl ValueLogCore { } count += 1; - #[cfg(test)] - if count == 1 { - debug!("merge from {}", vptr.offset); - } - // TODO confiure if count % 100 == 0 { tokio::time::sleep(Duration::from_millis(1)).await; diff --git a/src/y/codec.rs b/src/y/codec.rs index 88f492e..78cfaa8 100644 --- a/src/y/codec.rs +++ b/src/y/codec.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use std::io::{Read, Write}; +use tokio::io::{AsyncRead, AsyncWrite}; use crate::Result; @@ -10,7 +11,6 @@ pub trait Encode { pub trait Decode { fn dec(&mut self, rd: &mut dyn Read) -> Result<()>; } -use tokio::io::{AsyncRead, AsyncWrite}; #[async_trait] pub trait AsyncEncDec diff --git a/src/y/merge_iterator.rs b/src/y/merge_iterator.rs index 5d2b451..cc98a13 100644 --- a/src/y/merge_iterator.rs +++ b/src/y/merge_iterator.rs @@ -51,6 +51,7 @@ impl Xiterator for MergeIterator { self.set_iter_empty(); return None; } + // Must be call rewind before call next assert_ne!(self.cursor.borrow().index, usize::MAX); self._next() } @@ -62,7 +63,7 @@ impl Xiterator for MergeIterator { return None; } { - // Before every rewind, all flags will be resetted + // Before every rewind, all flags will be reset self.reset(); for (index, itr) in self.itrs.iter().enumerate() { if itr.rewind().is_some() { @@ -177,6 +178,7 @@ impl MergeIterator { { let heap = self.heap.borrow_mut(); if heap.is_empty() { + // TODO why self.set_iter_empty(); return None; } diff --git a/src/y/mod.rs b/src/y/mod.rs index 99fa580..4394c12 100644 --- a/src/y/mod.rs +++ b/src/y/mod.rs @@ -15,8 +15,9 @@ use std::collections::hash_map::DefaultHasher; use std::fs::{File, OpenOptions}; use std::hash::Hasher; -use std::io::{ErrorKind, Write}; +use std::io::{Cursor, ErrorKind, Write}; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::backtrace::Backtrace; use std::{array, cmp, io}; use thiserror::Error; @@ -92,6 +93,10 @@ pub enum Error { // GC #[error("Stop iteration")] StopGC, + + //////////////////////////////// + #[error("Transaction Conflict. Please retry.")] + TxCommitConflict, } impl Default for Error { @@ -268,7 +273,7 @@ pub(crate) fn parallel_load_block_key(fp: File, offsets: Vec) -> Vec Result<()> { Ok(()) } -pub(crate) fn hex_str(buf: &[u8]) -> String { - String::from_utf8(buf.to_vec()).unwrap_or_else(|_| "Sorry, Hex String Failed!!!".to_string()) +pub(crate) fn hex_str>(buf: T) -> String { + String::from_utf8(buf.as_ref().to_vec()) + .unwrap_or_else(|_| "Sorry, Hex String Failed!!!".to_string()) +} + +// Generates a new key by appending ts to key. +#[inline(always)] +pub(crate) fn key_with_ts(key: &[u8], ts: u64) -> Vec { + let mut out = vec![0u8; key.len() + 8]; + out[..key.len()].copy_from_slice(key); + (&mut out[key.len()..]).write_u64::(u64::MAX - ts).unwrap(); + out +} + +#[inline(always)] +pub(crate) fn parse_ts(out: &[u8]) -> u64 { + if out.len() <= 8 { + return 0; + } + let mut cursor = Cursor::new(out); + u64::MAX - cursor.read_u64::().unwrap() +} + +#[inline(always)] +pub(crate) fn parse_key(out: &[u8]) -> &[u8] { + if out.len() < 8 { + return out; + } + &out[..(out.len() - 8)] +} + +// checks for key equality ignoring the version timestamp suffix. +#[inline(always)] +pub(crate) fn same_key_ignore_version>(src: T, dst: T) -> bool { + if src.as_ref().len() != dst.as_ref().len() { + return false; + } + let key_src = parse_key(src.as_ref()); + let key_dst = parse_key(dst.as_ref()); + key_src == key_dst +} + +// compare_keys checks the key without timestamp and checks the timestamp if keyNoTs +// is same. +// a would be sorted higher than aa if we use bytes.compare +// All keys should have timestamp. +#[inline(always)] +pub(crate) fn compare_key(key1: &[u8], key2: &[u8]) -> Ordering { + assert!(key1.len() > 8); + assert!(key2.len() > 8); + return match parse_key(&key1).cmp(&parse_key(key2)) { + Ordering::Equal => parse_ts(key1).cmp(&parse_ts(key2)), + other => other, + }; } #[cfg(any(target_os = "macos", target_os = "linux"))] @@ -375,8 +432,8 @@ fn dsync() { /// find a value in array with binary search pub fn binary_search(array: &[T], f: F) -> Option -where - F: Fn(&T) -> Ordering, + where + F: Fn(&T) -> Ordering, { let mut low = 0; let mut high = array.len() - 1;